CI框架集成php-queue消息队列

Posted 鲍一翔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CI框架集成php-queue消息队列相关的知识,希望对你有一定的参考价值。

1、首先下载ci模板

https://codeigniter.org.cn/

2、在根目录新建composer.json配置文件,内容在本文最下面如下

3、目录结构如图

4、由于执行composer不可以使用root权限,使用useradd www-deploy -g www新建用户

5、执行命令

su www-deploy -c "composer install"

CI框架集成php-queue消息队列

过程有点慢,耐心等几分钟

6、下载好之后会在当前文件层级下多出vender文件夹,检查这个参数是否指向类库文件,用于自动加载类包的检索路径

CI框架集成php-queue消息队列

7、好了之后在入口文件index.php最后面引用自动加载文件的路径

require_once './vendor/autoload.php';

8、controllers目录下编写业务代码,其他配置文件不需要做修改

以下代码实现生产者功能

<?php

defined('BASEPATH') OR exit('No direct script access allowed');


class Welcome extends CI_Controller {


    public function __construct()

    {

        parent::__construct();

        date_default_timezone_set('GMT');

        Resque::setBackend('127.0.0.1:6379');

    }


    /**

     * @param string $que

     * @param string $job

     * 实现生产者功能,根据传入的queue和job创建任务

     */

    public function index($que = '',$job = '')

    {

        $argv = func_get_args();

        Resque_Event::trigger('beforeEnqueue', $argv);


        $args = array(

            'time' => time(),

            'array' => array(

                'test' => 'test',

            ),

        );

        if (empty($argv[1])) {

            $jobId = Resque::enqueue('default', $argv[0], $args, true);

        } else {

            $jobId = Resque::enqueue($argv[0], $argv[1], $args, true);

        }

        echo "Queued job ".$jobId."\n\n";

    }


    /**

     * @param $jobId

     * 检查job处理情况

     */

    public function checkStatus($jobId){

        $argv = array('',$jobId);

        if(empty($argv[1])) {

            die('Specify the ID of a job to monitor the status of.');

        }


        $status = new Resque_Job_Status($argv[1]);

        if(!$status->isTracking()) {

            die("Resque is not tracking the status of this job.\n");

        }


        die(sprintf("Status of %s is: %s",$argv[1],$status->get()));

    }

}

以下代码实现消费者功能

测试效果在终端执行

QUEUE=queue php index.php job/resque

看到这个表示成功

<?php

defined('BASEPATH') OR exit('No direct script access allowed');


/**

 * Class Job

 * 这个类名必须与创建job时候定义的job名一致,而且不需要继承CI_Controller

 */

class Job {

    public function perform()

    {

        // Work work work

        print_r($this->args);

    }


    public function resque(){

        $QUEUE = getenv('QUEUE');

        if(empty($QUEUE)) {

            die("Set QUEUE env var containing the list of queues to work.\n");

        }


        $REDIS_BACKEND = getenv('REDIS_BACKEND');


// A redis database number

        $REDIS_BACKEND_DB = getenv('REDIS_BACKEND_DB');

        if(!empty($REDIS_BACKEND)) {

            if (empty($REDIS_BACKEND_DB))

                Resque::setBackend($REDIS_BACKEND);

            else

                Resque::setBackend($REDIS_BACKEND, $REDIS_BACKEND_DB);

        }


        $logLevel = false;

        $LOGGING = getenv('LOGGING');

        $VERBOSE = getenv('VERBOSE');

        $VVERBOSE = getenv('VVERBOSE');

        if(!empty($LOGGING) || !empty($VERBOSE)) {

            $logLevel = true;

        }

        else if(!empty($VVERBOSE)) {

            $logLevel = true;

        }


        $APP_INCLUDE = getenv('APP_INCLUDE');

        if($APP_INCLUDE) {

            if(!file_exists($APP_INCLUDE)) {

                die('APP_INCLUDE ('.$APP_INCLUDE.") does not exist.\n");

            }


            require_once $APP_INCLUDE;

        }


// See if the APP_INCLUDE containes a logger object,

// If none exists, fallback to internal logger

        if (!isset($logger) || !is_object($logger)) {

            $logger = new Resque_Log($logLevel);

        }


        $BLOCKING = getenv('BLOCKING') !== FALSE;


        $interval = 5;

        $INTERVAL = getenv('INTERVAL');

        if(!empty($INTERVAL)) {

            $interval = $INTERVAL;

        }


        $count = 1;

        $COUNT = getenv('COUNT');

        if(!empty($COUNT) && $COUNT > 1) {

            $count = $COUNT;

        }


        $PREFIX = getenv('PREFIX');

        if(!empty($PREFIX)) {

            $logger->log(Psr\Log\LogLevel::INFO, 'Prefix set to {prefix}', array('prefix' => $PREFIX));

            Resque_Redis::prefix($PREFIX);

        }


        if($count > 1) {

            for($i = 0; $i < $count; ++$i) {

                $pid = Resque::fork();

                if($pid === false || $pid === -1) {

                    $logger->log(Psr\Log\LogLevel::EMERGENCY, 'Could not fork worker {count}', array('count' => $i));

                    die();

                }

                // Child, start the worker

                else if(!$pid) {

                    $queues = explode(',', $QUEUE);

                    $worker = new Resque_Worker($queues);

                    $worker->setLogger($logger);

                    $logger->log(Psr\Log\LogLevel::NOTICE, 'Starting worker {worker}', array('worker' => $worker));

                    $worker->work($interval, $BLOCKING);

                    break;

                }

            }

        }

// Start a single worker

        else {

            $queues = explode(',', $QUEUE);

            $worker = new Resque_Worker($queues);

            $worker->setLogger($logger);


            $PIDFILE = getenv('PIDFILE');

            if ($PIDFILE) {

                file_put_contents($PIDFILE, getmypid()) or

                die('Could not write PID information to ' . $PIDFILE);

            }


            $logger->log(Psr\Log\LogLevel::NOTICE, 'Starting worker {worker}', array('worker' => $worker));

            $worker->work($interval, $BLOCKING);

        }

    }

}

composer.json:

{

  "name": "chrisboulton/php-resque",

  "type": "library",

  "description": "Redis backed library for creating background jobs and processing them later. Based on resque for Ruby.",

  "keywords": ["job", "background", "redis", "resque"],

  "homepage": "http://www.github.com/chrisboulton/php-resque/",

  "license": "MIT",

  "authors": [

    {

      "name": "Chris Boulton",

      "email": "chris@bigcommerce.com"

    }

  ],

  "repositories": [

    {

      "type": "vcs",

      "url": "https://github.com/chrisboulton/credis"

    }

  ],

  "require": {

    "php": ">=5.3.0",

    "ext-pcntl": "*",

    "colinmollenhour/credis": "~1.7",

    "psr/log": "~1.0"

  },

  "suggest": {

    "ext-proctitle": "Allows php-resque to rename the title of UNIX processes to show the status of a worker.",

    "ext-redis": "Native PHP extension for Redis connectivity. Credis will automatically utilize when available."

  },

  "require-dev": {

    "phpunit/phpunit": "3.7.*"

  },

  "bin": [

    "application/controllers/resque"

  ],

  "autoload": {

    "psr-0": {

      "Resque": "application/libraries"

    }

  }

}


以上是关于CI框架集成php-queue消息队列的主要内容,如果未能解决你的问题,请参考以下文章

基于java消息队列的分布式RPC开源框架

初识消息队列处理机框架KClient

CI-持续集成

后台消息队列处理简易框架

OF框架在Azure中配置持续集成CI

Laravel 集成 RabbitMQ 消息队列