thinkphp5 消息队列thinkphp-queue扩展
Posted myvic
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了thinkphp5 消息队列thinkphp-queue扩展相关的知识,希望对你有一定的参考价值。
1.简介
thinkphp-queue是thinkphp的一个第三方扩展, 内置了 Redis,Database,Topthink ,Sync这四种驱动,推荐使用redis
2. 下载 和安装
composer require topthink/think-queue
配置目录在: application/extra/queue.php
return [ \'connector\' => \'Redis\', // Redis 驱动 \'expire\' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null \'default\' => \'default\', // 默认的队列名称 \'host\' => \'127.0.0.1\', // redis 主机ip \'port\' => 6379, // redis 端口 \'password\' => \'\', // redis 密码 \'select\' => 0, // 使用哪一个 db,默认为 db0 \'timeout\' => 0, // redis连接的超时时间 \'persistent\' => false, // 是否是长连接 // \'connector\' => \'Database\', // 数据库驱动 // \'expire\' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null // \'default\' => \'default\', // 默认的队列名称 // \'table\' => \'jobs\', // 存储消息的表名,不带前缀 // \'dsn\' => [], // \'connector\' => \'Topthink\', // ThinkPHP内部的队列通知服务平台 ,本文不作介绍 // \'token\' => \'\', // \'project_id\' => \'\', // \'protocol\' => \'https\', // \'host\' => \'qns.topthink.com\', // \'port\' => 443, // \'api_version\' => 1, // \'max_retries\' => 3, // \'default\' => \'default\', // \'connector\' => \'Sync\', // Sync 驱动,该驱动的实际作用是取消消息队列,还原为同步执行 ];
3.入队,创建队列的代码
/** * 文件路径: \\application\\index\\controller\\JobTest.php * 该控制器的业务代码中借助了thinkphp-queue 库,将一个消息推送到消息队列 */ namespace application\\index\\controller; use think\\Exception; use think\\Queue; class JobTest { /** * 一个使用了队列的 action */ public function actionWithHelloJob(){ // 1.当前任务将由哪个类来负责处理。 // 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法 $jobHandlerClassName = \'application\\index\\job\\Hello\'; // 2.当前任务归属的队列名称,如果为新队列,会自动创建 $jobQueueName = "helloJobQueue"; // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串 // ( jobData 为对象时,需要在先在此处手动序列化,否则只存储其public属性的键值对) $jobData = [ \'ts\' => time(), \'bizId\' => uniqid() , \'a\' => 1 ] ; // 4.将该任务推送到消息队列,等待对应的消费者去执行 $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName ); // database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false if( $isPushed !== false ){ echo date(\'Y-m-d H:i:s\') . " a new Hello Job is Pushed to the MQ"."<br>"; }else{ echo \'Oops, something went wrong.\'; } } }
如果是多个任务:如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名app\\lib\\job\\Job2@task1
、app\\lib\\job\\Job2@task2
4.消费队列的代码:
<?php namespace app\\test\\job; use think\\queue\\Job; class Hello { /** * fire方法是消息队列默认调用的方法 * @param Job $job 当前的任务对象 * @param array|mixed $data 发布任务时自定义的数据 */ public function fire(Job $job,$data){ // 如有必要,可以根据业务需求和数据库中的最新数据,判断该任务是否仍有必要执行. $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data); if(!isJobStillNeedToBeDone){ $job->delete(); return; } $isJobDone = $this->doHelloJob($data); if ($isJobDone) { //如果任务执行成功, 记得删除任务 $job->delete(); print("<info>Hello Job has been done and deleted"."</info>\\n"); }else{ if ($job->attempts() > 3) { //通过这个方法可以检查这个任务已经重试了几次了 print("<warn>Hello Job has been retried more than 3 times!"."</warn>\\n"); $job->delete(); // 也可以重新发布这个任务 //print("<info>Hello Job will be availabe again after 2s."."</info>\\n"); //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行 } } } /** * 有些消息在到达消费者时,可能已经不再需要执行了 * @param array|mixed $data 发布任务时自定义的数据 * @return boolean 任务执行的结果 */ private function checkDatabaseToSeeIfJobNeedToBeDone($data){ return true; } /** * 根据消息中的数据进行实际的业务处理 * @param array|mixed $data 发布任务时自定义的数据 * @return boolean 任务执行的结果 */ private function doHelloJob($data) { // 根据消息中的数据进行实际的业务处理... var_dump($data); // print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \\n"); // print("<info>Hello Job is Fired at " . date(\'Y-m-d H:i:s\') ."</info> \\n"); // print("<info>Hello Job is Done!"."</info> \\n"); return true; } }
执行代码:
php think queue:work --queue helloJobQueue
5.总结:
5.1 命名:
-
queue:subscribe 命令 [截至2017-02-15,作者暂未实现该模式,略过]
-
queue:work 命令
work 命令: 该命令将启动一个 work 进程来处理消息队列。
php think queue:work --queue helloJobQueue
-
queue:listen 命令
listen 命令: 该命令将会创建一个 listen 父进程 ,然后由父进程通过
proc_open(‘php think queue:work’)
的方式来创建一个work 子 进程来处理消息队列,且限制该work进程的执行时间。php think queue:listen --queue helloJobQueue
queue:restart 重新开启
参考资料:https://blog.csdn.net/will5451/article/details/80434174
https://www.kancloud.cn/yangweijie/learn_thinkphp5_with_yang/367645
https://github.com/top-think/think-queue
以上是关于thinkphp5 消息队列thinkphp-queue扩展的主要内容,如果未能解决你的问题,请参考以下文章