zhaojs
2023-05-16 ea24ddd0b978cbd3b0a900711b49b8a9c2db4186
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
 
namespace think\queue\command;
 
use think\console\Command;
use think\console\Input;
use think\console\input\Option;
use think\console\Output;
use think\queue\Listener;
 
class Listen extends Command
{
    /** @var  Listener */
    protected $listener;
 
    public function configure()
    {
        $this->setName('queue:listen')
            ->addOption('queue', null, Option::VALUE_OPTIONAL, 'The queue to listen on', null)
            ->addOption('delay', null, Option::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0)
            ->addOption('memory', null, Option::VALUE_OPTIONAL, 'The memory limit in megabytes', 128)
            ->addOption('timeout', null, Option::VALUE_OPTIONAL, 'Seconds a job may run before timing out', 60)
            ->addOption('sleep', null, Option::VALUE_OPTIONAL, 'Seconds to wait before checking queue for jobs', 3)
            ->addOption('tries', null, Option::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0)
            ->setDescription('Listen to a given queue');
    }
 
    public function initialize(Input $input, Output $output)
    {
        $this->listener = new Listener($this->findCommandPath());
        $this->listener->setSleep($input->getOption('sleep'));
        $this->listener->setMaxTries($input->getOption('tries'));
 
        $this->listener->setOutputHandler(function ($type, $line) use ($output) {
            $output->write($line);
        });
    }
 
    public function execute(Input $input, Output $output)
    {
        $delay = $input->getOption('delay');
 
        $memory = $input->getOption('memory');
 
        $timeout = $input->getOption('timeout');
 
        $queue = $input->getOption('queue') ?: 'default';
 
        $this->listener->listen($queue, $delay, $memory, $timeout);
    }
    
    protected function findCommandPath()
    {
        return defined('ROOT_PATH') ? ROOT_PATH : dirname($_SERVER['argv'][0]);
    }
}