swoole 异步队列

Posted Funsion Wu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了swoole 异步队列相关的知识,希望对你有一定的参考价值。

安装步骤如下(推荐把安装文件下载到 /usr/local/src 目录下):
step 1: wget --no-check-certificate https://github.com/swoole/swoole-src/archive/v1.9.13.tar.gz
step 2: tar zxf v1.9.13.tar.gz
step 3: cd swoole-src-1.9.13
step 4phpize 
step 5: ./configure --with-php-config=/usr/local/php/bin/php-config
step 6: make && make install
step 7: 修改 php.ini ,加入 extension=swoole.so ,然后重启 php-fpm
step 8: 执行 php --info | grep swoole 查看设置是否生效

############################  消费者,文件名《server.php》  ############################
<?php
class Server {
    private $serv;
    private $logFile;
    
    public function __construct() {
        $this->serv = new swoole_server(\'0.0.0.0\', 9501);   // 允许所有IP访问
        $this->serv->set([
            \'worker_num\' => 4,       // 一般设置为服务器CPU数的1-4倍
            \'task_worker_num\' => 1,  // task进程的数量(一般任务都是同步阻塞的,可以设置为多进程单线程)
            \'daemonize\' => true,      // 以守护进程执行
            \'package_eof\' => PHP_EOL,  // 设置EOF
            \'open_eof_split\' => true,  // 按照EOF进行分包,防止TCP粘包
            \'max_request\' => 5000,   // 设置worker进程的最大任务数,软重启,防止可能存在的内存溢出
            \'task_max_request\' => 5000,   // 设置Task线程的最大任务数,软重启,防止可能存在的内存溢出
        //  \'task_ipc_mode\' => 1,      // 使用unix socket通信,默认模式
        //  \'log_file\' => \'/data/log/queue.log\' ,    // swoole日志
        
        // 数据包分发策略(dispatch_mode=1/3时,底层会屏蔽onConnect/onClose事件,
        // 原因是这2种模式下无法保证onConnect/onClose/onReceive的顺序,非请求响应式的服务器程序,请不要使用模式1或3)
        //  \'dispatch_mode\' => 2,        // 固定模式,根据连接的文件描述符分配worker。这样可以保证同一个连接发来的数据只会被同一个worker处理
        ]);

        $this->logFile = dirname(__FILE__).\'/log.txt\';        // 以守护进程执行的时候,要绝对路径
        $this->serv->on(\'Receive\', [$this, \'onReceive\']);
        $this->serv->on(\'Task\', [$this, \'onTask\']);
        $this->serv->on(\'Finish\', [$this, \'onFinish\']);
        $this->serv->start();
    }
    
    /**
    * 接收到数据时回调此函数,发生在worker进程中
    * $server,swoole_server对象
    * $fd,TCP客户端连接的文件描述符
    * $from_id,TCP连接所在的Reactor线程ID
    * $data,收到的数据内容,可能是文本或者二进制内容
    */
    public function onReceive($serv, $fd, $from_id, $data ) {
        $str  = "=========== onReceive ============ \\n";
        $str .= "Get Message From Client $fd:$data \\n";
        error_log($str, 3, $this->logFile);
        $serv->task( $data );
    }
    
    /**
    * 在task_worker进程内被调用。worker进程可以使用swoole_server_task函数向task_worker进程投递新的任务。当前的Task进程在调用onTask回调函数时会将进程状态切换为忙碌,
    * 这时将不再接收新的Task,当onTask函数返回时会将进程状态切换为空闲然后继续接收新的Task。
    * $task_id是任务ID,由swoole扩展内自动生成,用于区分不同的任务。$task_id和$src_worker_id组合起来才是全局唯一的,不同的worker进程投递的任务ID可能会有相同
    * $src_worker_id来自于哪个worker进程
    * $data 是任务的内容
    */
    public function onTask($serv, $task_id, $src_worker_id, $data) {
        $data   = trim($data);  // 删除EOF
        $array  = json_decode( $data , true );
        $str    = "=========== onTask ============ \\n";
        $str   .= var_export($array, 1);
        error_log($str, 3 , $this->logFile);
        return $array;
    }
    
    /**
    * 当worker进程投递的任务在task_worker中完成时,task进程会通过swoole_server->finish()方法将任务处理的结果发送给worker进程
    * $task_id是任务的ID
    * $data是任务处理的结果内容(也就是onTask()函数,中return的值)
    */
    public function onFinish($serv, $task_id, $data) {
        $str  = "=========== onFinish ============ \\n";
        $str .= "Task $task_id finish ! \\n";
        $str .= var_export($data, 1);
        error_log($str, 3, $this->logFile);
    }

}
$server = new Server();


############################  生产者,文件名《client.php》  ############################
<?php
class Client {
    
    private function _sendData($data) {
        // dispatch_mode=2, 固定模式,根据连接的文件描述符分配worker
        // 每次都要重新new ,不可做成单例(这样才有不同的连接文件描述符,进而有不同的task_id)
        $client = new \\swoole_client(SWOOLE_SOCK_TCP);
        $client->connect(\'127.0.0.1\', 9501, 1);
        // 发送数据包,一定要在最后指明断行(防止“粘包”),一般值 PHP_EOL ,与配置参数package_eof的值保持一致
        $client->send($data.PHP_EOL);
    }
    
    public function run() {
        for($i=0; $i<10; $i++) {
            $data = [  \'time\'   => rand(1000, 9999),
                      \'id\'     => $i ];
            $jsonData = json_encode($data);
            $this->_sendData($jsonData);
        }
    }
}


############################  华丽的分割线  ############################

先执行 php server.php (执行一次,会以守护进程运行)
然后再执行 php client.php

# 内核参数调整(关键步骤)
http://wiki.swoole.com/wiki/page/p-server/sysctl.html

# swoole 离线手册,里面有更多的示例
https://github.com/smalleyes/swoole-chm

 

以上是关于swoole 异步队列的主要内容,如果未能解决你的问题,请参考以下文章

swoolephp5.6 swoole(demo)小测试

php代码在WSL系统上运行swoole 扩展.还需要安装搭建服务器吗?

PHP使用Swoole搭建一个异步服务

Swoole 初识

php异步多线程swoole用法实例

学习swoole的心得