<?php
|
// +----------------------------------------------------------------------
|
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
|
// +----------------------------------------------------------------------
|
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
|
// +----------------------------------------------------------------------
|
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
|
// +----------------------------------------------------------------------
|
// | Author: yunwuxin <448901948@qq.com>
|
// +----------------------------------------------------------------------
|
|
namespace think\queue;
|
|
use Exception;
|
use think\Hook;
|
use think\Queue;
|
|
class Worker
|
{
|
|
/**
|
* 执行下个任务
|
* @param string $queue
|
* @param int $delay
|
* @param int $sleep
|
* @param int $maxTries
|
* @return array
|
*/
|
public function pop($queue = null, $delay = 0, $sleep = 3, $maxTries = 0)
|
{
|
|
$job = $this->getNextJob($queue);
|
|
if (!is_null($job)) {
|
Hook::listen('worker_before_process', $queue);
|
return $this->process($job, $maxTries, $delay);
|
}
|
|
Hook::listen('worker_before_sleep', $queue);
|
$this->sleep($sleep);
|
|
return ['job' => null, 'failed' => false];
|
}
|
|
/**
|
* 获取下个任务
|
* @param string $queue
|
* @return Job
|
*/
|
protected function getNextJob($queue)
|
{
|
if (is_null($queue)) {
|
return Queue::pop();
|
}
|
|
foreach (explode(',', $queue) as $queue) {
|
if (!is_null($job = Queue::pop($queue))) {
|
return $job;
|
}
|
}
|
}
|
|
/**
|
* Process a given job from the queue.
|
* @param \think\queue\Job $job
|
* @param int $maxTries
|
* @param int $delay
|
* @return array
|
* @throws Exception
|
*/
|
public function process(Job $job, $maxTries = 0, $delay = 0)
|
{
|
if ($maxTries > 0 && $job->attempts() > $maxTries) {
|
return $this->logFailedJob($job);
|
}
|
|
try {
|
$job->fire();
|
|
return ['job' => $job, 'failed' => false];
|
} catch (Exception $e) {
|
if (!$job->isDeleted()) {
|
$job->release($delay);
|
}
|
|
throw $e;
|
}
|
}
|
|
/**
|
* Log a failed job into storage.
|
* @param \Think\Queue\Job $job
|
* @return array
|
*/
|
protected function logFailedJob(Job $job)
|
{
|
if (!$job->isDeleted()) {
|
try {
|
$job->delete();
|
$job->failed();
|
} finally {
|
Hook::listen('queue_failed', $job);
|
}
|
}
|
|
return ['job' => $job, 'failed' => true];
|
}
|
|
/**
|
* Sleep the script for a given number of seconds.
|
* @param int $seconds
|
* @return void
|
*/
|
public function sleep($seconds)
|
{
|
sleep($seconds);
|
}
|
|
}
|