PHP的轻量消息队列php-resque使用说明

Posted 囊萤映雪ZBY

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PHP的轻量消息队列php-resque使用说明相关的知识,希望对你有一定的参考价值。

php的轻量消息队列php-resque使用说明

消息队列说白了就是一个最简单的先进先出队列,队列的一个成员就是一段文本。
一个Worker,可以处理一个队列,也可以处理很多个队列,并且可以通过增加Worker的进程/线程数来加快队列的执行速度。

job.php文件
<?php
class PHP_Job
{
    public function perform()
    {
        $param = $this->args;
        $file='phone.txt';
          if(false!==fopen($file,'w+')){
            file_put_contents($file,serialize($param));//写入缓存
         }
    }
}
queue.php文件
<?php
if(empty($argv[1])) {
    die('Specify the name of a job to add. e.g, php queue.php PHP_Job');
}

require __DIR__ . '/init.php';
date_default_timezone_set('GMT');
Resque::setBackend('127.0.0.1:6379');

// You can also use a DSN-style format:
//Resque::setBackend('redis://user:pass@127.0.0.1:6379');
//Resque::setBackend('redis://user:pass@a.host.name:3432/2');

$args = array(
    'time' => time(),
    'array' => array(
        'test' => 'test',
    ),
);
if (empty($argv[2])) {
    $jobId = Resque::enqueue('default', $argv[1], $args, true);
} else {
        $jobId = Resque::enqueue($argv[1], $argv[2], $args, true);    
}
 echo "Queued job ".$jobId."\n\n";
resque.php
<?php
date_default_timezone_set('GMT');
require 'bad_job.php';
require 'job.php';
require 'php_error_job.php';

require '../bin/resque';

总结使用过程:
queue.php 文件功能(参数)
终端执行命令如下:
php queue.php PHP_Job (将queue.php文件中的参数传递 PHP_Job ,生成队列文本)

QUEUE=default VVERBOSE=1 php resque.php 执行队列文本作作务

前面的QUEUE部分是设置环境变量,我们指定当前的Worker只负责处理default队列。

default 可以根据自己定义队列名称

实际操作实例如下:

android_device', null, null, '跑安卓脚本');
        $this-&gt;addOption('run_android_his_device', null, null, '跑安卓历史脚本');
        $this-&gt;addOption('run_ios_add', null, null, 'IOS添加增量');
        $this-&gt;addOption('run_ios_his_add', null, null, 'IOS历史增量');
        $this-&gt;addArgument('pid', null, '平台号', 0);
        $this-&gt;addArgument('start', null, '开始条数', 0);
        $this-&gt;addArgument('limit', null, '结束条数', 10000000);
    }

    //执行路口
    protected function execute(Input $input, Output $output) {
        //安卓
        //sleep(60*30);
        //exit("Time out!");
        $pid = $input-&gt;getArgument('pid');
        $start = $input-&gt;getArgument('start');
        $limit = $input-&gt;getArgument('limit');
        $configures = Config::get('command');

        $this-&gt;config_args =  $configures[$pid];

        echo date("Y-m-d H:i:s");
        var_dump($input-&gt;getArguments());
        var_dump($input-&gt;getOptions());
        try {
            if ($input-&gt;hasOption('run_android_device')) {
                $this-&gt;playDeviceAndroid($pid, $start, $limit);
            }
        } catch (\Exception $e){
            $this-&gt;taskExceptionNotice($e);
            throw $e;
        }
    }



    public function playDeviceAndroid($pid, $start, $limit) {
        $redis_conf = Config::get('redis_config');
        \Resque::setBackend($redis_conf['server']);

        $current_time = time();
        $times = date('Y-m-d',($current_time-28800)).'T'.date('H:i:s',($current_time-28800)).'.'.rand(101,999).'Z';

        $ctrl_max = $this-&gt;config_args['ctrl_max'];
        $ctrl_counter = 0;
        $bucket = [];

        $list = db('device_android_tmp')-&gt;where(array('type'=&gt;1,'pid'=&gt;$pid,'platform'=&gt;1))-&gt;order('id asc')-&gt;limit($start, $limit)-&gt;select();
        foreach ($list as $k =&gt; $v) {
            if (date('Ymd',$current_time) == date('Ymd',$v['create_at'])) {
                $isFirst = 'true';
            }else{
                $isFirst = 'false';
            }
            echo $k;


            for ($i=0;$i&lt;$this-&gt;config_args['item_rate'];$i++) {
                $tmp = [];
                $tmp['ikey'] = $this-&gt;config_args['iKey'];
                $tmp['info'] = $v;
                $tmp['isFirst'] = $isFirst;
                $tmp['times'] = $times;
                $tmp['type'] = 'Android';
                $tmp['pid'] = $pid;
                $tmp['sdk'] = 'android';
                $bucket[] = $tmp;

                ++$ctrl_counter;
                if($ctrl_counter == $ctrl_max){
                    \Resque::enqueue($this-&gt;queue, $this-&gt;job_class_name, $bucket, true);  // $bucket
                    $bucket = [];
                    $ctrl_counter = 0;
                }

            }
        }

        if(count($bucket)){
            \Resque::enqueue($this-&gt;queue, $this-&gt;job_class_name, $bucket, true);    // $bucket
        }
        echo "\r\n".date("Y-m-d H:i:s"). count($list) . "\r\n++++++++++++++++++++++++++++\r\n";
        unset($list);   
    }

}
">RunDevice.php 文件部份内容如下: <?php namespace app\command; use think\console\Command; use think\console\Input; use think\console\Output; use think\Db; use think\Config; class RunDevice extends BaseCommand {    private $config_args;    private $queue = 'analysis';   // 定义队列名称    private $job_class_name = 'PushDevice';    // 定义队列任务执行的类名    public function __construct() {        set_time_limit(0);        ini_set('memory_limit', '-1');        ini_set('default_socket_timeout', -1);        ini_set('display_errors', 1);        parent::__construct();    }    protected function configure() {        $this->setName('rundevice')->setDescription('跑安卓脚本');        $this->addOption('run_android_device', null, null, '跑安卓脚本');        $this->addOption('run_android_his_device', null, null, '跑安卓历史脚本');        $this->addOption('run_ios_add', null, null, 'IOS添加增量');        $this->addOption('run_ios_his_add', null, null, 'IOS历史增量');        $this->addArgument('pid', null, '平台号', 0);        $this->addArgument('start', null, '开始条数', 0);        $this->addArgument('limit', null, '结束条数', 10000000);    }    //执行路口    protected function execute(Input $input, Output $output) {        //安卓        //sleep(60*30);        //exit("Time out!");        $pid = $input->getArgument('pid');        $start = $input->getArgument('start');        $limit = $input->getArgument('limit');        $configures = Config::get('command');        $this->config_args =  $configures[$pid];        echo date("Y-m-d H:i:s");        var_dump($input->getArguments());        var_dump($input->getOptions());        try {            if ($input->hasOption('run_android_device')) {                $this->playDeviceAndroid($pid, $start, $limit);            }        } catch (\Exception $e){            $this->taskExceptionNotice($e);            throw $e;        }    }    public function playDeviceAndroid($pid, $start, $limit) {        $redis_conf = Config::get('redis_config');        \Resque::setBackend($redis_conf['server']);        $current_time = time();        $times = date('Y-m-d',($current_time-28800)).'T'.date('H:i:s',($current_time-28800)).'.'.rand(101,999).'Z';        $ctrl_max = $this->config_args['ctrl_max'];        $ctrl_counter = 0;        $bucket = [];        $list = db('device_android_tmp')->where(array('type'=>1,'pid'=>$pid,'platform'=>1))->order('id asc')->limit($start, $limit)->select();        foreach ($list as $k => $v) {            if (date('Ymd',$current_time) == date('Ymd',$v['create_at'])) {                $isFirst = 'true';            }else{                $isFirst = 'false';            }            echo $k;            for ($i=0;$i<$this->config_args['item_rate'];$i++) {                $tmp = [];                $tmp['ikey'] = $this->config_args['iKey'];                $tmp['info'] = $v;                $tmp['isFirst'] = $isFirst;                $tmp['times'] = $times;                $tmp['type'] = 'Android';                $tmp['pid'] = $pid;                $tmp['sdk'] = 'android';                $bucket[] = $tmp;                ++$ctrl_counter;                if($ctrl_counter == $ctrl_max){                    \Resque::enqueue($this->queue, $this->job_class_name, $bucket, true);  // $bucket                    $bucket = [];                    $ctrl_counter = 0;                }            }        }        if(count($bucket)){            \Resque::enqueue($this->queue, $this->job_class_name, $bucket, true);    // $bucket        }        echo "\r\n".date("Y-m-d H:i:s"). count($list) . "\r\n++++++++++++++++++++++++++++\r\n";        unset($list);      } }

\Resque::enqueue($this->queue, $this->job_class_name, $bucket, true);
生成队列文本(定义队列名称,任务执行的类名, 参数,)

demo:
$jobId = Resque::enqueue(‘default’, $argv[1], $args, true);

$jobId = Resque::enqueue(‘test’, $argv[1], $args, true);
QUEUE=test php resque.php 此时的队列名称为 test

终端执行:
php demo/queue.php test PHP_job
另一个终终端执行 (处理队列如下:)
QUEUE=test php resque.php


job.php 需要执行的
<?php
class PushDevice
{
    public function perform()
    {
        $param = $this->args;
        if ($param){
            $ch = curl_init();
            curl_setopt($ch, CURLOPT_URL, 'https://gate.hockeyapp.net/v2/track');
            curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
            curl_setopt($ch, CURLOPT_TIMEOUT,2);   //设置curl超时时间60秒
            // post数据
            curl_setopt($ch, CURLOPT_POST, 1);

            foreach ($param as $v) {
                $data = [
                    'data'=>['baseData'=>['state' => 0, 'ver' =>2],
                        'baseType'=>'SessionStateData'],
                    'iKey' => $v['ikey'],
                    'name' => 'Microsoft.ApplicationInsights.SessionState',
                    'sampleRate' => 100,
                    'tags' => [
                        'ai.application.ver' => '1.0 (1)',
                        'ai.device.id' => $v['info']['device_id'],
                        'ai.device.language' => $v['info']['language'],
                        'ai.device.locale' => $v['info']['country'],
                        'ai.device.model' => $v['info']['device_model'],
                        'ai.device.oemName' => $v['info']['device_brand'],
                        'ai.device.os' => $v['type'].' OS',
                        'ai.device.osVersion' => $v['info']['device_os_version'],
                        'ai.device.screenResolution' => $v['info']['resolution'],
                        'ai.device.type' => 'Phone',
                        'ai.internal.sdkVersion' => $v['sdk'].$v['info']['device_os_version'],
                        'ai.session.id' => $v['session_id'],
                        'ai.session.isFirst' => $v['isFirst'],
                        'ai.session.isNew' => 'true',
                        "ai.user.id" => $v['info']['user_id'],
                    ],
                    'time' => $v['times'],
                    'ver' => 1
                ];

                // post的变量
                curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));

                curl_exec($ch);

                 //$this->curlPost(json_encode($data));
//                $res = json_decode($response,true);
//                if ($res['itemsAccepted'] == 1 || $res['itemsReceived'] == 1 ) {
//                    file_put_contents("log.txt", $response.PHP_EOL, FILE_APPEND);
//                }

            }
            curl_close($ch);
        }
    }

    public function create_uuid() {
        if (function_exists ( 'com_create_guid' )) {
            return com_create_guid ();
        } else {
            mt_srand ( ( double ) microtime () * 10000 ); //optional for php 4.2.0 and up.随便数播种,4.2.0以后不需要了。
            $charid = strtoupper ( md5 ( uniqid ( rand (), true ) ) ); //根据当前时间(微秒计)生成唯一id.
            $hyphen = chr ( 45 ); // "-"
            $uuid = '' . //chr(123)// "{"
                substr ( $charid, 0, 8 ) . $hyphen . substr ( $charid, 8, 4 ) . $hyphen . substr ( $charid, 12, 4 ) . $hyphen . substr ( $charid, 16, 4 ) . $hyphen . substr ( $charid, 20, 12 );

            unset($hyphen, $charid);
            return $uuid;
        }
    }


    public function curlPost($body) {
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, 'https://gate.hockeyapp.net/v2/track');
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($ch, CURLOPT_TIMEOUT,5);   //设置curl超时时间60秒
        // post数据
        curl_setopt($ch, CURLOPT_POST, 1);
        // post的变量
        curl_setopt($ch, CURLOPT_POSTFIELDS, $body);

        $output = curl_exec($ch);
        curl_close($ch);
        return $output;
    }
}

执行如下:
QUEUE=analysis php resque.php
QUEUE=analysis VVERBOSE=1 php ./resque.php
自带web 界面

https://www.jianshu.com/p/395652dc66f1
可是我的mac 在操作gem 命令时,一直提示 ruby 版本太低
升级 ruby brew install ruby

Mac 上brew install ruby 成功安装ruby-2.2.3 但是打ruby -v 依然显示2.0.0

用brew install ruby 会安装在/usr/local/Cellar/ruby/2.2.3/bin/ruby路径下

ruby -v
ruby 2.0.0p481 (2014-05-08 revision 45883) [universal.x86_64-darwin14]
@caoh ➜ ~ /usr/local/Cellar/ruby/2.2.3/bin/ruby -v
ruby 2.2.3p173 (2015-08-18 revision 51636) [x86_64-darwin14]

resque-web -p 40000 线上需要开通40000端口号,可以自己指定某个端开口

generated by haroopad

 

以上是关于PHP的轻量消息队列php-resque使用说明的主要内容,如果未能解决你的问题,请参考以下文章

PHP-RESQUE重试机制

centos下的activemq的配置及PHP的使用

PHP + Redis 实现消息队列

redis消息队列有没有

消息队列_Beanstalkd-0001.Beanstalkd之轻量级分布式内存队列部署?

限时领奖消息队列MNS训练营重磅来袭,边学习充电,边领充电宝~