PHP的轻量消息队列php-resque使用说明
Posted 囊萤映雪ZBY
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PHP的轻量消息队列php-resque使用说明相关的知识,希望对你有一定的参考价值。
php的轻量消息队列php-resque使用说明
消息队列说白了就是一个最简单的先进先出队列,队列的一个成员就是一段文本。
一个Worker,可以处理一个队列,也可以处理很多个队列,并且可以通过增加Worker的进程/线程数来加快队列的执行速度。
job.php文件
<?php
class PHP_Job
{
public function perform()
{
$param = $this->args;
$file='phone.txt';
if(false!==fopen($file,'w+')){
file_put_contents($file,serialize($param));//写入缓存
}
}
}
queue.php文件
<?php
if(empty($argv[1])) {
die('Specify the name of a job to add. e.g, php queue.php PHP_Job');
}
require __DIR__ . '/init.php';
date_default_timezone_set('GMT');
Resque::setBackend('127.0.0.1:6379');
// You can also use a DSN-style format:
//Resque::setBackend('redis://user:pass@127.0.0.1:6379');
//Resque::setBackend('redis://user:pass@a.host.name:3432/2');
$args = array(
'time' => time(),
'array' => array(
'test' => 'test',
),
);
if (empty($argv[2])) {
$jobId = Resque::enqueue('default', $argv[1], $args, true);
} else {
$jobId = Resque::enqueue($argv[1], $argv[2], $args, true);
}
echo "Queued job ".$jobId."\n\n";
resque.php
<?php
date_default_timezone_set('GMT');
require 'bad_job.php';
require 'job.php';
require 'php_error_job.php';
require '../bin/resque';
总结使用过程:
queue.php 文件功能(参数)
终端执行命令如下:
php queue.php PHP_Job (将queue.php文件中的参数传递 PHP_Job ,生成队列文本)
QUEUE=default VVERBOSE=1 php resque.php 执行队列文本作作务
前面的QUEUE部分是设置环境变量,我们指定当前的Worker只负责处理default队列。
default 可以根据自己定义队列名称
实际操作实例如下:
android_device', null, null, '跑安卓脚本');
$this->addOption('run_android_his_device', null, null, '跑安卓历史脚本');
$this->addOption('run_ios_add', null, null, 'IOS添加增量');
$this->addOption('run_ios_his_add', null, null, 'IOS历史增量');
$this->addArgument('pid', null, '平台号', 0);
$this->addArgument('start', null, '开始条数', 0);
$this->addArgument('limit', null, '结束条数', 10000000);
}
//执行路口
protected function execute(Input $input, Output $output) {
//安卓
//sleep(60*30);
//exit("Time out!");
$pid = $input->getArgument('pid');
$start = $input->getArgument('start');
$limit = $input->getArgument('limit');
$configures = Config::get('command');
$this->config_args = $configures[$pid];
echo date("Y-m-d H:i:s");
var_dump($input->getArguments());
var_dump($input->getOptions());
try {
if ($input->hasOption('run_android_device')) {
$this->playDeviceAndroid($pid, $start, $limit);
}
} catch (\Exception $e){
$this->taskExceptionNotice($e);
throw $e;
}
}
public function playDeviceAndroid($pid, $start, $limit) {
$redis_conf = Config::get('redis_config');
\Resque::setBackend($redis_conf['server']);
$current_time = time();
$times = date('Y-m-d',($current_time-28800)).'T'.date('H:i:s',($current_time-28800)).'.'.rand(101,999).'Z';
$ctrl_max = $this->config_args['ctrl_max'];
$ctrl_counter = 0;
$bucket = [];
$list = db('device_android_tmp')->where(array('type'=>1,'pid'=>$pid,'platform'=>1))->order('id asc')->limit($start, $limit)->select();
foreach ($list as $k => $v) {
if (date('Ymd',$current_time) == date('Ymd',$v['create_at'])) {
$isFirst = 'true';
}else{
$isFirst = 'false';
}
echo $k;
for ($i=0;$i<$this->config_args['item_rate'];$i++) {
$tmp = [];
$tmp['ikey'] = $this->config_args['iKey'];
$tmp['info'] = $v;
$tmp['isFirst'] = $isFirst;
$tmp['times'] = $times;
$tmp['type'] = 'Android';
$tmp['pid'] = $pid;
$tmp['sdk'] = 'android';
$bucket[] = $tmp;
++$ctrl_counter;
if($ctrl_counter == $ctrl_max){
\Resque::enqueue($this->queue, $this->job_class_name, $bucket, true); // $bucket
$bucket = [];
$ctrl_counter = 0;
}
}
}
if(count($bucket)){
\Resque::enqueue($this->queue, $this->job_class_name, $bucket, true); // $bucket
}
echo "\r\n".date("Y-m-d H:i:s"). count($list) . "\r\n++++++++++++++++++++++++++++\r\n";
unset($list);
}
}
">RunDevice.php 文件部份内容如下:
<?php
namespace app\command;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Db;
use think\Config;
class RunDevice extends BaseCommand {
private $config_args;
private $queue = 'analysis'; // 定义队列名称
private $job_class_name = 'PushDevice'; // 定义队列任务执行的类名
public function __construct() {
set_time_limit(0);
ini_set('memory_limit', '-1');
ini_set('default_socket_timeout', -1);
ini_set('display_errors', 1);
parent::__construct();
}
protected function configure() {
$this->setName('rundevice')->setDescription('跑安卓脚本');
$this->addOption('run_android_device', null, null, '跑安卓脚本');
$this->addOption('run_android_his_device', null, null, '跑安卓历史脚本');
$this->addOption('run_ios_add', null, null, 'IOS添加增量');
$this->addOption('run_ios_his_add', null, null, 'IOS历史增量');
$this->addArgument('pid', null, '平台号', 0);
$this->addArgument('start', null, '开始条数', 0);
$this->addArgument('limit', null, '结束条数', 10000000);
}
//执行路口
protected function execute(Input $input, Output $output) {
//安卓
//sleep(60*30);
//exit("Time out!");
$pid = $input->getArgument('pid');
$start = $input->getArgument('start');
$limit = $input->getArgument('limit');
$configures = Config::get('command');
$this->config_args = $configures[$pid];
echo date("Y-m-d H:i:s");
var_dump($input->getArguments());
var_dump($input->getOptions());
try {
if ($input->hasOption('run_android_device')) {
$this->playDeviceAndroid($pid, $start, $limit);
}
} catch (\Exception $e){
$this->taskExceptionNotice($e);
throw $e;
}
}
public function playDeviceAndroid($pid, $start, $limit) {
$redis_conf = Config::get('redis_config');
\Resque::setBackend($redis_conf['server']);
$current_time = time();
$times = date('Y-m-d',($current_time-28800)).'T'.date('H:i:s',($current_time-28800)).'.'.rand(101,999).'Z';
$ctrl_max = $this->config_args['ctrl_max'];
$ctrl_counter = 0;
$bucket = [];
$list = db('device_android_tmp')->where(array('type'=>1,'pid'=>$pid,'platform'=>1))->order('id asc')->limit($start, $limit)->select();
foreach ($list as $k => $v) {
if (date('Ymd',$current_time) == date('Ymd',$v['create_at'])) {
$isFirst = 'true';
}else{
$isFirst = 'false';
}
echo $k;
for ($i=0;$i<$this->config_args['item_rate'];$i++) {
$tmp = [];
$tmp['ikey'] = $this->config_args['iKey'];
$tmp['info'] = $v;
$tmp['isFirst'] = $isFirst;
$tmp['times'] = $times;
$tmp['type'] = 'Android';
$tmp['pid'] = $pid;
$tmp['sdk'] = 'android';
$bucket[] = $tmp;
++$ctrl_counter;
if($ctrl_counter == $ctrl_max){
\Resque::enqueue($this->queue, $this->job_class_name, $bucket, true); // $bucket
$bucket = [];
$ctrl_counter = 0;
}
}
}
if(count($bucket)){
\Resque::enqueue($this->queue, $this->job_class_name, $bucket, true); // $bucket
}
echo "\r\n".date("Y-m-d H:i:s"). count($list) . "\r\n++++++++++++++++++++++++++++\r\n";
unset($list);
}
}
\Resque::enqueue($this->queue, $this->job_class_name, $bucket, true);
生成队列文本(定义队列名称,任务执行的类名, 参数,)
demo:
$jobId = Resque::enqueue(‘default’, $argv[1], $args, true);
$jobId = Resque::enqueue(‘test’, $argv[1], $args, true);
QUEUE=test php resque.php 此时的队列名称为 test
终端执行:
php demo/queue.php test PHP_job
另一个终终端执行 (处理队列如下:)
QUEUE=test php resque.php
job.php 需要执行的
<?php
class PushDevice
{
public function perform()
{
$param = $this->args;
if ($param){
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, 'https://gate.hockeyapp.net/v2/track');
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_TIMEOUT,2); //设置curl超时时间60秒
// post数据
curl_setopt($ch, CURLOPT_POST, 1);
foreach ($param as $v) {
$data = [
'data'=>['baseData'=>['state' => 0, 'ver' =>2],
'baseType'=>'SessionStateData'],
'iKey' => $v['ikey'],
'name' => 'Microsoft.ApplicationInsights.SessionState',
'sampleRate' => 100,
'tags' => [
'ai.application.ver' => '1.0 (1)',
'ai.device.id' => $v['info']['device_id'],
'ai.device.language' => $v['info']['language'],
'ai.device.locale' => $v['info']['country'],
'ai.device.model' => $v['info']['device_model'],
'ai.device.oemName' => $v['info']['device_brand'],
'ai.device.os' => $v['type'].' OS',
'ai.device.osVersion' => $v['info']['device_os_version'],
'ai.device.screenResolution' => $v['info']['resolution'],
'ai.device.type' => 'Phone',
'ai.internal.sdkVersion' => $v['sdk'].$v['info']['device_os_version'],
'ai.session.id' => $v['session_id'],
'ai.session.isFirst' => $v['isFirst'],
'ai.session.isNew' => 'true',
"ai.user.id" => $v['info']['user_id'],
],
'time' => $v['times'],
'ver' => 1
];
// post的变量
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
curl_exec($ch);
//$this->curlPost(json_encode($data));
// $res = json_decode($response,true);
// if ($res['itemsAccepted'] == 1 || $res['itemsReceived'] == 1 ) {
// file_put_contents("log.txt", $response.PHP_EOL, FILE_APPEND);
// }
}
curl_close($ch);
}
}
public function create_uuid() {
if (function_exists ( 'com_create_guid' )) {
return com_create_guid ();
} else {
mt_srand ( ( double ) microtime () * 10000 ); //optional for php 4.2.0 and up.随便数播种,4.2.0以后不需要了。
$charid = strtoupper ( md5 ( uniqid ( rand (), true ) ) ); //根据当前时间(微秒计)生成唯一id.
$hyphen = chr ( 45 ); // "-"
$uuid = '' . //chr(123)// "{"
substr ( $charid, 0, 8 ) . $hyphen . substr ( $charid, 8, 4 ) . $hyphen . substr ( $charid, 12, 4 ) . $hyphen . substr ( $charid, 16, 4 ) . $hyphen . substr ( $charid, 20, 12 );
unset($hyphen, $charid);
return $uuid;
}
}
public function curlPost($body) {
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, 'https://gate.hockeyapp.net/v2/track');
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_TIMEOUT,5); //设置curl超时时间60秒
// post数据
curl_setopt($ch, CURLOPT_POST, 1);
// post的变量
curl_setopt($ch, CURLOPT_POSTFIELDS, $body);
$output = curl_exec($ch);
curl_close($ch);
return $output;
}
}
执行如下:
QUEUE=analysis php resque.php
QUEUE=analysis VVERBOSE=1 php ./resque.php
自带web 界面
https://www.jianshu.com/p/395652dc66f1
可是我的mac 在操作gem 命令时,一直提示 ruby 版本太低
升级 ruby brew install ruby
Mac 上brew install ruby 成功安装ruby-2.2.3 但是打ruby -v 依然显示2.0.0
用brew install ruby 会安装在/usr/local/Cellar/ruby/2.2.3/bin/ruby路径下
ruby -v
ruby 2.0.0p481 (2014-05-08 revision 45883) [universal.x86_64-darwin14]
@caoh ➜ ~ /usr/local/Cellar/ruby/2.2.3/bin/ruby -v
ruby 2.2.3p173 (2015-08-18 revision 51636) [x86_64-darwin14]
resque-web -p 40000 线上需要开通40000端口号,可以自己指定某个端开口
以上是关于PHP的轻量消息队列php-resque使用说明的主要内容,如果未能解决你的问题,请参考以下文章