PHP异步任务之swoole
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PHP异步任务之swoole相关的知识,希望对你有一定的参考价值。
一、安装swoole
下载地址:https://github.com/swoole/swoole-src/releases/tag/1.8.12-stable
下载压缩包,上传到服务器(测试centos),解压缩
cd swoole-src-1.8.7-stable
phpize(phpize是用来扩展php扩展模块的,通过phpize可以建立php的外挂模块)
./configure
make && make install
添加php.ini扩展extension=swoole.so
通过php -m查看扩展信息如果出现swoole则为成功
测试make test出现上图信息,表示安装成功
二、问题解决
出现问题prce_open,为扩展问题,修改php.ini文件,去除即可
三、测试方案
四、服务端Demo(CI框架,以发送邮件为例)
<?php
class Swoole extends CI_Controller
{
static $index = 0;
static $serv;
private static $buffers = array();
/**
* @param $fd
* @return swoole_buffer
*/
static function getBuffer($fd, $create = true)
{
if (!isset(self::$buffers[$fd]))
{
if (!$create)
{
return false;
}
self::$buffers[$fd] = new swoole_buffer(1024 * 128);
}
return self::$buffers[$fd];
}
function my_log($msg)
{
global $serv;
if (empty($serv->worker_pid))
{
$serv->worker_pid = posix_getpid();
}
//echo "#".$serv->worker_pid."\t[".date(‘H:i:s‘)."]\t".$msg.PHP_EOL;
}
function my_onStart(swoole_server $serv)
{
global $argv;
swoole_set_process_name("php {$argv[0]}: master");
$this->my_log("Server: start.Swoole version is [".SWOOLE_VERSION."]");
$this->my_log("MasterPid={$serv->master_pid}|Manager_pid={$serv->manager_pid}");
}
function forkChildInWorker() {
global $serv;
$this->my_log("on worker start\n");
$process = new swoole_process( function (swoole_process $worker) use ($serv) {
});
$pid = $process->start();
$this->my_log("Fork child process success. pid={$pid}\n");
//保存子进程对象,这里如果不保存,那对象会被销毁,管道也会被关闭
$serv->childprocess = $process;
}
function processRename(swoole_server $serv, $worker_id) {
global $argv;
if ( $serv->taskworker)
{
swoole_set_process_name("php {$argv[0]}: task");
}
else
{
swoole_set_process_name("php {$argv[0]}: worker");
}
$this->my_log("WorkerStart: MasterPid={$serv->master_pid}|Manager_pid={$serv->manager_pid}|WorkerId={$serv->worker_id}|WorkerPid={$serv->worker_pid}");
}
function setTimerInWorker(swoole_server $serv, $worker_id) {
if ($worker_id == 0) {
$this->my_log("Start: ".microtime(true)."\n");
}
}
function my_onShutdown($serv)
{
$this->my_log("Server: onShutdown\n");
}
function my_onClose(swoole_server $serv, $fd, $from_id)
{
$this->my_log("Client[[email protected]$from_id]: fd=$fd is closed");
$buffer = Swoole::getBuffer($fd);
if ($buffer)
{
$buffer->clear();
}
}
function my_onConnect(swoole_server $serv, $fd, $from_id)
{
$this->my_log("Client: Connect --- {$fd}");
}
function timer_show($id)
{
$this->my_log("Timer#$id");
}
function my_onWorkerStart(swoole_server $serv, $worker_id)
{
$this->processRename($serv, $worker_id);
if (!$serv->taskworker)
{
swoole_process::signal(SIGUSR2, function($signo){
$this->my_log("SIGNAL: $signo\n");
});
$serv->defer(function(){
$this->my_log("defer call\n");
});
}
else
{
}
}
function my_onWorkerStop($serv, $worker_id)
{
$this->my_log("WorkerStop[$worker_id]|pid=".$serv->worker_pid.".\n");
}
function my_onPacket($serv, $data, $clientInfo)
{
$serv->sendto($clientInfo[‘address‘], $clientInfo[‘port‘], "Server " . $data);
var_dump($clientInfo);
}
function my_onFinish(swoole_server $serv, $task_id, $data)
{
list($str, $fd) = explode(‘-‘, $data);
$serv->send($fd, ‘taskok‘);
var_dump($str, $fd);
$this->my_log("AsyncTask Finish: result={$data}. PID=".$serv->worker_pid.PHP_EOL);
}
function my_onWorkerError(swoole_server $serv, $worker_id, $worker_pid, $exit_code, $signo)
{
$this->my_log("worker abnormal exit. WorkerId=$worker_id|Pid=$worker_pid|ExitCode=$exit_code|Signal=$signo\n");
}
function broadcast(swoole_server $serv, $fd = 0, $data = "hello")
{
$start_fd = 0;
$this->my_log("broadcast\n");
while(true)
{
$conn_list = $serv->connection_list($start_fd, 10);
if($conn_list === false)
{
break;
}
$start_fd = end($conn_list);
foreach($conn_list as $conn)
{
if($conn === $fd) continue;
$ret1 = $serv->send($conn, $data);
}
}
}
/**
* 开启swoole服务,处理相关异步任务
*/
public function service_start(){
$serv = new swoole_server("127.0.0.1", 9501, SWOOLE_PROCESS, SWOOLE_SOCK_TCP);
$serv->set(config_item(‘swoole‘));
Swoole::$serv = $serv;
$serv->on(‘PipeMessage‘, function($serv, $src_worker_id, $msg) {
$this->my_log("PipeMessage: Src={$src_worker_id},Msg=".trim($msg));
if ($serv->taskworker)
{
$serv->sendMessage("hello user process",
$src_worker_id);
}
});
$serv->on(‘Start‘, array($this, ‘my_onStart‘));
$serv->on(‘Connect‘, array($this, ‘my_onConnect‘));
$serv->on(‘Receive‘, array($this, ‘my_onReceive‘));
$serv->on(‘Packet‘, array($this, ‘my_onPacket‘));
$serv->on(‘Close‘, array($this, ‘my_onClose‘));
$serv->on(‘Shutdown‘, array($this, ‘my_onShutdown‘));
$serv->on(‘WorkerStart‘, array($this, ‘my_onWorkerStart‘));
$serv->on(‘WorkerStop‘, array($this, ‘my_onWorkerStop‘));
$serv->on(‘Task‘, array($this, ‘my_onTask‘));
$serv->on(‘Finish‘, array($this, ‘my_onFinish‘));
$serv->on(‘WorkerError‘, array($this, ‘my_onWorkerError‘));
$serv->on(‘ManagerStart‘, function($serv) {
global $argv;
swoole_set_process_name("php {$argv[0]}: manager");
});
$serv->start();
}
/**
* 接收客户端信息
* @param swoole_server $serv 服务端
* @param int $fd 主进程ID
* @param string $from_id 客户端ID
* @param string $data 数据
*/
function my_onReceive(swoole_server $serv, $fd, $from_id, $data)
{
print_r($data);
$this->my_log("Worker#{$serv->worker_pid} Client[[email protected]$from_id]: received: $data");
$serv->task($data, -1, function (swoole_server $serv, $task_id, $data)
{
$this->my_log("Task Callback: ");
var_dump($task_id, $data);
});
}
function my_onTask(swoole_server $serv, $task_id, $from_id, $data)
{
$data=json_decode($data,true);
switch ($data[‘cmd‘]){
case ‘sendemail‘:
$this->sendEmail($data);
break;
default:
break;
}
}
/**
* 发送邮件
* @param array $data 邮件内容
*/
private function sendEmail($data){
//TODO 发送邮件
}
}
五、客户端Demo
$client = new swoole_client(SWOOLE_SOCK_TCP);
$client->connect(‘127.0.0.1‘, 9501);
$data=array(
‘cmd‘=>‘sendemail‘,
‘to‘=>$to,
‘subject‘=>$subject,
‘message‘=>$message,
‘type‘=>$type
);
$client->send(json_encode($data));
$client->close();
六、swoole服务查看
七、swoole服务关闭
kill 29193
八、swoole后台PHP进程查看
本文出自 “kamil” 博客,转载请与作者联系!
以上是关于PHP异步任务之swoole的主要内容,如果未能解决你的问题,请参考以下文章