// +---------------------------------------------------------------------- namespace think\queue\connector; use think\exception\HttpException; use think\queue\Connector; use think\Request; use think\queue\job\Topthink as TopthinkJob; use think\Response; class Topthink extends Connector { protected $options = [ 'token' => '', 'project_id' => '', 'protocol' => 'https', 'host' => 'qns.topthink.com', 'port' => 443, 'api_version' => 1, 'max_retries' => 3, 'default' => 'default' ]; /** @var Request */ protected $request; protected $url; protected $curl = null; protected $last_status; protected $headers = []; public function __construct($options) { if (!empty($options)) { $this->options = array_merge($this->options, $options); } $this->url = "{$this->options['protocol']}://{$this->options['host']}:{$this->options['port']}/v{$this->options['api_version']}/"; $this->headers['Authorization'] = "Bearer {$this->options['token']}"; $this->request = Request::instance(); } public function push($job, $data = '', $queue = null) { return $this->pushRaw(0, $queue, $this->createPayload($job, $data)); } public function later($delay, $job, $data = '', $queue = null) { return $this->pushRaw($delay, $queue, $this->createPayload($job, $data)); } public function release($queue, $job, $delay) { return $this->pushRaw($delay, $queue, $job->payload, $job->attempts); } public function marshal() { $job = new TopthinkJob($this, $this->marshalPushedJob(), $this->request->header('topthink-message-queue')); if ($this->request->header('topthink-message-status') == 'success') { $job->fire(); } else { $job->failed(); } return new Response('OK'); } public function pushRaw($delay, $queue, $payload, $attempts = 0) { $queue_name = $this->getQueue($queue); $queue = rawurlencode($queue_name); $url = "project/{$this->options['project_id']}/queue/{$queue}/message"; $message = [ 'payload' => $payload, 'attempts' => $attempts, 'delay' => $delay ]; return $this->apiCall('POST', $url, $message)->id; } public function deleteMessage($queue, $id) { $queue = rawurlencode($queue); $url = "project/{$this->options['project_id']}/queue/{$queue}/message/{$id}"; return $this->apiCall('DELETE', $url); } protected function apiCall($type, $url, $params = []) { $url = "{$this->url}$url"; if ($this->curl == null) { $this->curl = curl_init(); } switch ($type = strtoupper($type)) { case 'DELETE': curl_setopt($this->curl, CURLOPT_URL, $url); curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type); curl_setopt($this->curl, CURLOPT_POSTFIELDS, json_encode($params)); break; case 'PUT': curl_setopt($this->curl, CURLOPT_URL, $url); curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type); curl_setopt($this->curl, CURLOPT_POSTFIELDS, json_encode($params)); break; case 'POST': curl_setopt($this->curl, CURLOPT_URL, $url); curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type); curl_setopt($this->curl, CURLOPT_POST, true); curl_setopt($this->curl, CURLOPT_POSTFIELDS, $params); break; case 'GET': curl_setopt($this->curl, CURLOPT_POSTFIELDS, null); curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type); curl_setopt($this->curl, CURLOPT_HTTPGET, true); $url .= '?' . http_build_query($params); curl_setopt($this->curl, CURLOPT_URL, $url); break; } curl_setopt($this->curl, CURLOPT_SSL_VERIFYPEER, false); curl_setopt($this->curl, CURLOPT_RETURNTRANSFER, true); $headers = []; foreach ($this->headers as $k => $v) { if ($k == 'Connection') { $v = 'Close'; } $headers[] = "$k: $v"; } curl_setopt($this->curl, CURLOPT_HTTPHEADER, $headers); curl_setopt($this->curl, CURLOPT_CONNECTTIMEOUT, 10); return $this->callWithRetries(); } protected function callWithRetries() { for ($retry = 0; $retry < $this->options['max_retries']; $retry++) { $out = curl_exec($this->curl); if ($out === false) { $this->reportHttpError(0, curl_error($this->curl)); } $this->last_status = curl_getinfo($this->curl, CURLINFO_HTTP_CODE); if ($this->last_status >= 200 && $this->last_status < 300) { return self::jsonDecode($out); } elseif ($this->last_status >= 500) { self::waitRandomInterval($retry); } else { $this->reportHttpError($this->last_status, $out); } } $this->reportHttpError($this->last_status, "Service unavailable"); return; } protected static function jsonDecode($response) { $data = json_decode($response); $json_error = json_last_error(); if ($json_error != JSON_ERROR_NONE) { throw new \RuntimeException($json_error); } return $data; } protected static function waitRandomInterval($retry) { $max_delay = pow(4, $retry) * 100 * 1000; usleep(rand(0, $max_delay)); } protected function reportHttpError($status, $text) { throw new HttpException($status, "http error: {$status} | {$text}"); } /** * Marshal out the pushed job and payload. * * @return object */ protected function marshalPushedJob() { return (object) [ 'id' => $this->request->header('topthink-message-id'), 'payload' => $this->request->getContent(), 'attempts' => $this->request->header('topthink-message-attempts') ]; } public function __destruct() { if ($this->curl != null) { curl_close($this->curl); $this->curl = null; } } public function pop($queue = null) { throw new \RuntimeException('pop queues not support for this type'); } }