thinkphp5 消息队列thinkphp-queue扩展

Posted myvic

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了thinkphp5 消息队列thinkphp-queue扩展相关的知识,希望对你有一定的参考价值。

1.简介

thinkphp-queue是thinkphp的一个第三方扩展, 内置了 Redis,Database,Topthink ,Sync这四种驱动,推荐使用redis

2. 下载 和安装

composer require topthink/think-queue

配置目录在: application/extra/queue.php

return [
    \'connector\'  => \'Redis\',            // Redis 驱动
    \'expire\'     => 60,                // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    \'default\'    => \'default\',        // 默认的队列名称
    \'host\'       => \'127.0.0.1\',        // redis 主机ip
    \'port\'       => 6379,            // redis 端口
    \'password\'   => \'\',                // redis 密码
    \'select\'     => 0,                // 使用哪一个 db,默认为 db0
    \'timeout\'    => 0,                // redis连接的超时时间
    \'persistent\' => false,            // 是否是长连接

    //    \'connector\' => \'Database\',   // 数据库驱动
    //    \'expire\'    => 60,           // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    //    \'default\'   => \'default\',    // 默认的队列名称
    //    \'table\'     => \'jobs\',       // 存储消息的表名,不带前缀
    //    \'dsn\'       => [],

    //    \'connector\'   => \'Topthink\',    // ThinkPHP内部的队列通知服务平台 ,本文不作介绍
    //    \'token\'       => \'\',
    //    \'project_id\'  => \'\',
    //    \'protocol\'    => \'https\',
    //    \'host\'        => \'qns.topthink.com\',
    //    \'port\'        => 443,
    //    \'api_version\' => 1,
    //    \'max_retries\' => 3,
    //    \'default\'     => \'default\',

//        \'connector\'   => \'Sync\',        // Sync 驱动,该驱动的实际作用是取消消息队列,还原为同步执行
];

3.入队,创建队列的代码

/**
* 文件路径: \\application\\index\\controller\\JobTest.php
* 该控制器的业务代码中借助了thinkphp-queue 库,将一个消息推送到消息队列
*/
namespace application\\index\\controller;
  use think\\Exception;

  use think\\Queue;

  class JobTest {
  /**
   * 一个使用了队列的 action
   */
  public function actionWithHelloJob(){
      
      // 1.当前任务将由哪个类来负责处理。 
      //   当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
      $jobHandlerClassName  = \'application\\index\\job\\Hello\'; 
      // 2.当前任务归属的队列名称,如果为新队列,会自动创建
      $jobQueueName        = "helloJobQueue"; 
      // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
      //   ( jobData 为对象时,需要在先在此处手动序列化,否则只存储其public属性的键值对)
      $jobData             = [ \'ts\' => time(), \'bizId\' => uniqid() , \'a\' => 1 ] ;
      // 4.将该任务推送到消息队列,等待对应的消费者去执行
      $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );    
      // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
      if( $isPushed !== false ){  
          echo date(\'Y-m-d H:i:s\') . " a new Hello Job is Pushed to the MQ"."<br>";
      }else{
          echo \'Oops, something went wrong.\';
      }
  }
 }

如果是多个任务:如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名app\\lib\\job\\Job2@task1app\\lib\\job\\Job2@task2

4.消费队列的代码:

<?php

namespace app\\test\\job;


use think\\queue\\Job;

class Hello {

    /**
     * fire方法是消息队列默认调用的方法
     * @param Job            $job      当前的任务对象
     * @param array|mixed    $data     发布任务时自定义的数据
     */
    public function fire(Job $job,$data){
        // 如有必要,可以根据业务需求和数据库中的最新数据,判断该任务是否仍有必要执行.
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if(!isJobStillNeedToBeDone){
            $job->delete();
            return;
        }

        $isJobDone = $this->doHelloJob($data);

        if ($isJobDone) {
            //如果任务执行成功, 记得删除任务
            $job->delete();
            print("<info>Hello Job has been done and deleted"."</info>\\n");
        }else{
            if ($job->attempts() > 3) {
                //通过这个方法可以检查这个任务已经重试了几次了
                print("<warn>Hello Job has been retried more than 3 times!"."</warn>\\n");
                $job->delete();
                // 也可以重新发布这个任务
                //print("<info>Hello Job will be availabe again after 2s."."</info>\\n");
                //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
            }
        }
    }

    /**
     * 有些消息在到达消费者时,可能已经不再需要执行了
     * @param array|mixed    $data     发布任务时自定义的数据
     * @return boolean                 任务执行的结果
     */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data){
        return true;
    }

    /**
     * 根据消息中的数据进行实际的业务处理
     * @param array|mixed    $data     发布任务时自定义的数据
     * @return boolean                 任务执行的结果
     */
    private function doHelloJob($data) {
        // 根据消息中的数据进行实际的业务处理...
        var_dump($data);
//        print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \\n");
//        print("<info>Hello Job is Fired at " . date(\'Y-m-d H:i:s\') ."</info> \\n");
//        print("<info>Hello Job is Done!"."</info> \\n");

        return true;
    }
}

执行代码:

php think queue:work --queue helloJobQueue

5.总结:

5.1 命名:

  • queue:subscribe 命令 [截至2017-02-15,作者暂未实现该模式,略过]

  • queue:work 命令

    work 命令: 该命令将启动一个 work 进程来处理消息队列。

    php think queue:work --queue helloJobQueue
  • queue:listen 命令

    listen 命令: 该命令将会创建一个 listen 父进程 ,然后由父进程通过 proc_open(‘php think queue:work’) 的方式来创建一个work 子 进程来处理消息队列,且限制该work进程的执行时间。

    php think queue:listen --queue helloJobQueue

    queue:restart 重新开启

     

     

    参考资料:https://blog.csdn.net/will5451/article/details/80434174

    https://www.kancloud.cn/yangweijie/learn_thinkphp5_with_yang/367645

    https://github.com/top-think/think-queue

     

以上是关于thinkphp5 消息队列thinkphp-queue扩展的主要内容,如果未能解决你的问题,请参考以下文章

ThinkPHP think-queue 消息队列

ThinkPHP think-queue 消息队列

Thinkphp5结合layer弹窗 定制操作结果页面

网站有漏洞被***该怎么解决和修复

8.windows消息机制消息队列

如何清空Windows消息队列