Swoole系列3.5进程池与进程管理器

Posted 码农老张Zy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Swoole系列3.5进程池与进程管理器相关的知识,希望对你有一定的参考价值。

进程池与进程管理器

我们已经学习过单个进程相关的内容,也学习了进程间如何进行通信,但是,一个一个地进程还是非常不好管理,这不,Swoole 就为我们直接准备好了进程池以及进程管理相关的工具。

进程池

进程池就是管理多个工作进程的一个工具,它本身也是基于后面我们要讲的进程管理器。主要的核心功能就是去管理多个进程,让开发者无需编写过多代码即可实现进程管理功能,同时也可以创建纯协程风格的,能利用多核 CPU 的服务端程序。

$workNum = 5;

$pool = new \\Swoole\\Process\\Pool($workNum);

$pool->on('WorkerStart', function(\\Swoole\\Process\\Pool $pool, $workerId)
   echo "工作进程:$workerId, pid: " . posix_getpid() . " 开始运行!", php_EOL;
   while(1);
);

$pool->on("WorkerStop", function(\\Swoole\\Process\\Pool $pool, $workerId)
   echo "工作进程:$workerId, pid: " . posix_getpid() . " 结束运行!", PHP_EOL;
);

$pool->start();

上面就是最简单的一个进程池应用。我们只需要实例化一个 \\Swoole\\Process\\Pool 对象,给他一个进程数量的参数。然后监听它的 WorkerStart 方法以及 WorkerStop 方法。大家可以查看 ps ,会发现有 5 个子进程启动了。

在这里,我们注意到在 WorkerStart 中,使用了一个循环挂起进程。如果不使用这个循环的话,进程在 WorkerStart 的回调函数中执行完成就会结束。然后进程池为了维护进程数量,又会拉起一个新的进程。于是就会一直不停地创建和结束进程。大家可以自己尝试一下哦。

所有事件的回调函数中的参数都有两个,一个是 pool 对象本身,另一个是一个 WorkerId 。这个 WorkerId 不是进程 ID ,是进程池对于这个进程的编号,从 0 开始。就像我们在上篇文章中用过的 $workers 数组一样。其实进程池的底层也是维护了这样一个数组,并把数组 key 值也就是编号返回到了回调函数中。像这里我们指定了开启 5 个进程,那么它的 WorkerId 就是从 0 到 4 共 5 个进程的 WorkerId 。

对于 \\Swoole\\Process\\Pool 来说,其实这只是它默认的一种通信模式的表现。关于这个问题就牵涉到了它的第二个参数,默认情况下,这个参数的值是 SWOOLE_IPC_NONE ,我们其实还可以设置为 SWOOLE_IPC_UNIXSOCK/SWOOLE_IPC_MSGQUEUE/SWOOLE_IPC_SOCKET 这几种。这几种模式与 SWOOLE_IPC_NONE 有许多不同,需要强制监听的内容不同,也不需要循环挂起,我们接着往下看。

通信模式

对于不同的进程通信模式,我们一个一个的来看一下。首先就是 SWOOLE_IPC_UNIXSOCK 模式。其实很明显了,这就是我们上回讲过的最常用的管道方式的通信。

$pool = new \\Swoole\\Process\\Pool($workNum, SWOOLE_IPC_UNIXSOCK);

$pool->on('WorkerStart', function(\\Swoole\\Process\\Pool $pool, $workerId)
   $proc1 = $pool->getProcess(0);
   while(1)
       sleep(1);
       if($workerId == 0)
           echo $proc1->read(), PHP_EOL;
       else
           $proc1->write("hello proc1, this is proc" . ($workerId + 1));
       
   
);

$pool->on("Message", function(Swoole\\Process\\Pool $pool, $data)
);

$pool->start();

很简单很熟悉的代码吧,就是上回我们讲过的管道通信方式。我们让第一个子进程读取数据,其它的进程向它发送数据。getProcess() 方法就是用于从进程池中获取单个进程,返回的就是之前我们学习过的 \\Swoole\\Process 对象。如果不指定它的参数,那么返回的就是当前 WorkerId 的进程对象,如果指定了参数,返回的就是指定的 WorkerId 进程对象。

除了 SWOOLE_IPC_NONE 模式之外,其它的模式都要监听一个 Message 事件。而且,其它的模式可以不监听 WorkerStart 事件。这个我们后面也会看到应用。

另一种通信模式就是 SWOOLE_IPC_MSGQUEUE ,也就是消息队列模式。

$pool = new \\Swoole\\Process\\Pool($workNum, SWOOLE_IPC_MSGQUEUE, 1);

$pool->on('WorkerStart', function(\\Swoole\\Process\\Pool $pool, $workerId)
   $process = $pool->getProcess();
   $process->useQueue(1, 2 | \\Swoole\\Process::IPC_NOWAIT);
   while(1)
       sleep(1);
       if($workerId == 0)
           foreach(range(1,4) as $v)
               $process->push("[$v]消息来了" . time());
           
       else
           $data = $process->pop();
           if($data)
               echo $process->pop(), ' workerid:', $workerId, PHP_EOL;
           
       
   
);

$pool->on("Message", function(Swoole\\Process\\Pool $pool, $data)
);

$pool->start();

同样地,这个测试代码中我们也是让第一个子进程 push() 数组,其它的子进程消费队列数据。也和我们上回讲过的队列通信方式是一模一样的。但是它还有不同的功能,这里我们就用队列来演示 Message 的作用。

外部通信

通过对操作系统的学习,我们知道队列是在系统中共用的,所有进程都可以接收到队列数据,它不是只受限于同一个程序的。因此,我们可以让另一个程序发送队列,而这边的程序接收队列,从而实现跨进程的通信。

$q = msg_get_queue(1);
foreach (range(1, 100) as $i) 
    $data = "消息来了" . microtime(true);
    msg_send($q, $i, $data, false);

我们先准备一个上面的代码文件,用于模拟另外的程序发送消息,然后准备下面的测试代码,用于接收发送来的队列消息。

$pool = new \\Swoole\\Process\\Pool($workNum, SWOOLE_IPC_MSGQUEUE, 1);

$pool->on("Message", function(Swoole\\Process\\Pool $pool, $data)
   var_dump($pool);
   $process = $pool->getProcess();
   echo $process->pid, PHP_EOL;
   var_dump($data);
);

$pool->start();

//object(Swoole\\Process\\Pool)#1 (2) 
//["master_pid"]=>
//  int(7114)
//  ["workers"]=>
//  array(1) 
//    [4]=>
//    object(Swoole\\Process)#3 (6) 
//    ["pipe"]=>
//      NULL
//      ["msgQueueId"]=>
//      NULL
//      ["msgQueueKey"]=>
//      NULL
//      ["pid"]=>
//      int(7119)
//      ["id"]=>
//      int(4)
//      ["callback":"Swoole\\Process":private]=>
//      NULL
//    
//  
//
//7119
//string(27) "消息来了1640313653.9647"
// ………………

首先运行起下面的代码,然后再开一个命令行执行上面的发送消息的代码,之后就能看到注释中返回的内容。在这里,我们这个进程池只监听了一个 Message 事件,这就说明 WorkerStart 事件确实并不是必须的哦。在这个 Message 事件中,我们可以看到每次处理的进程 ID 都是不同的,说明和之前我们理解的一样,这些进程也是在争抢处理队列数据。

除了这种消息队列之外,我们还可以使用一种方式,相信你看了以后会非常兴奋,那就是 SWOOLE_IPC_SOCKET 模式。

$pool = new \\Swoole\\Process\\Pool($workNum, SWOOLE_IPC_SOCKET);

$pool->listen('0.0.0.0', 8089);

$pool->on("Message", function(Swoole\\Process\\Pool $pool, $data)
   var_dump($data);
   $pool->write("你发来的数据是:\\"$data\\"");
);

$pool->start();

// [root@localhost source]# php 3.5进程池与进程管理器.php
// string(33) "客户端发消息1640318342.8369"
// string(33) "客户端发消息1640318344.8386"
// string(33) "客户端发消息1640318346.8397"

它会启动一个监听服务,所以必须要一个 listen() 方法指定监听 ip 和 端口 。然后客户端就可以直接建立 Socket 连接来与进程通信。除了 ip + 端口 的方式之外,它还可以 listen() 一个 UnixSocket 方式的连接,使用 "unix:/tmp/php.sock" 。

foreach (range(1, 3) as $i) 
    $fp = stream_socket_client("tcp://127.0.0.1:8089", $errno, $errstr) or die("error: $errstr\\n");
    $msg = "客户端发消息" . microtime(true);
    fwrite($fp, pack('N', strlen($msg)) . $msg);
    sleep(2);
    $data = fread($fp, 8192);
    if($data)
        var_dump(substr($data, 4, unpack('N', substr($data, 0, 4))[1]));
    
    fclose($fp);


// [root@localhost source]# php 3.52socketclient.php
// string(59) "你发来的数据是:"客户端发消息1640318342.8369""
// string(59) "你发来的数据是:"客户端发消息1640318344.8386""
// string(59) "你发来的数据是:"客户端发消息1640318346.8397""

熟悉吗?亲切吗?想想 php-fpm 的两种监听方式,再想想我们把上面的端口换成 9000 。这不就是一套 php-fpm 嘛!!

关闭进程

关闭进程就是一个 shutdown() 方法。这个方法必须在 start() 之后,在 WorkerStart 或其它回调函数中调用。

$pool = new \\Swoole\\Process\\Pool($workNum, SWOOLE_IPC_UNIXSOCK);

$pool->on('WorkerStart', function(\\Swoole\\Process\\Pool $pool, $workerId)
   if($workerId == 0)
       echo "Shutdown Worker:$workerId, pid:" . posix_getpid(), PHP_EOL;
       $pool->shutdown();
   
);

$pool->on('Message', function(\\Swoole\\Process\\Pool $pool, $workerId)
);

$pool->start();

// [root@localhost source]# php 3.5进程池与进程管理器.php
// Shutdown Worker:0, pid:7247

// [root@localhost source]# ps -ef | grep php
//    root      7246  4402  0 23:13 pts/1    00:00:00 php 3.5?程池与?程管理器.php
//    root      7248  7246  0 23:13 pts/1    00:00:00 php 3.5?程池与?程管理器.php
//    root      7249  7246  0 23:13 pts/1    00:00:00 php 3.5?程池与?程管理器.php
//    root      7250  7246  0 23:13 pts/1    00:00:00 php 3.5?程池与?程管理器.php
//    root      7251  7246  0 23:13 pts/1    00:00:00 php 3.5?程池与?程管理器.php

脱离进程

脱离进程的意思是将进程池当前 Worker 进程脱离管理,底层会立即创建新的进程,老的进程不再处理数据,由应用层代码自行管理它的生命周期。

$pool = new \\Swoole\\Process\\Pool(2);

$pool->on('WorkerStart', function (\\Swoole\\Process\\Pool $pool, $workerId) 

   $i = 0;
   while (1) 
       sleep(1);
       $i++;
       if ($i == 5) 
           echo "Detach Worker:$workerId, pid:" . posix_getpid(), PHP_EOL;
           $pool->detach();
        else if ($i == 10) 
           break;
       
   

);

$pool->on("WorkerStop", function (\\Swoole\\Process\\Pool $pool, $workerId) 
   echo "工作进程:$workerId, pid: " . posix_getpid() . " 结束运行!", PHP_EOL;
);


$pool->start();

// [root@localhost source]# php 3.5进程池与进程管理器.php
// Detach Worker:1, pid:16336
// Detach Worker:0, pid:16335
// 工作进程:0, pid: 16335 结束运行!
// 工作进程:1, pid: 16336 结束运行!
// [2021-12-23 23:33:42 @16334.0] WARNING ProcessPool::wait(): [Manager]unknown worker[pid=16335]
// [2021-12-23 23:33:42 @16334.0] WARNING ProcessPool::wait(): [Manager]unknown worker[pid=16336]
// Detach Worker:1, pid:16337
// Detach Worker:0, pid:16338

你可以把上面代码的 sleep() 时间拉长一下,然后就可以通过 ps 看到同时有更多的进程出现,进程池是在脱离触发后就马上拉起新的子进程,然后原来的子进程则是运行完自己的工作之后就自己释放掉了。

进程管理器

最后就是进程管理器,其实进程池底层就是基于进程管理器的。

$pm = new \\Swoole\\Process\\Manager();

for ($i = 0; $i < 2; $i++) 
    $pm->add(function (\\Swoole\\Process\\Pool $pool, $workerId) 
        echo "工作进程:$workerId, pid: " . posix_getpid() . " 开始运行!", PHP_EOL;
        while (1) ;
    );


$pm->start();

使用 \\Swoole\\Process\\Manager 对象的 add 方法直接就可以添加进程,其实就和我们之前去 new \\Swoole\\Process() 对象,并将对象放到一个 $workers 数组的感觉差不多,只不过在底层它处理的事情更多,比如说帮我们 wait() 进程,进行一些挂起操作等等。

进程管理器还有一些别的方法,不过其实直接使用进程管理器的地方不多,毕竟我们还是直接使用进程池会更方便一些。大家可以自己去官方文档中进行更深入的了解。

总结

今天的内容就是关于 Swoole 中两个进程管理工具的学习,更多情况下我们直接使用进程池就好了。其实从这里,你也就能够想到了,之前我们看到过的各种服务,Http、TCP 这些服务器应用,其实就是在实现了服务接口协议之后,通过进程池拉起 Worker 进程来实现多进程服务处理的。而 TaskWorker 其实也是再分出来的另一套进程池,与主进程中的服务处理进程是分离的,可以实现具体任务的异步并行处理。

当然,上面的内容是我自己的理解,如有纰漏,欢迎大家指正。

测试代码:

https://github.com/zhangyue0503/swoole/blob/main/3.Swoole%E8%BF%9B%E7%A8%8B/source/3.5%E8%BF%9B%E7%A8%8B%E6%B1%A0%E4%B8%8E%E8%BF%9B%E7%A8%8B%E7%AE%A1%E7%90%86%E5%99%A8.php

参考文档:

https://wiki.swoole.com/#/process/process_pool

https://wiki.swoole.com/#/process/process_manager

以上是关于Swoole系列3.5进程池与进程管理器的主要内容,如果未能解决你的问题,请参考以下文章

Swoole系列3.2Swoole 异步进程服务系统

使用swoole管理进程

Swoole系列3.6进程同步与共享内存

Swoole系列3.4进程间通信

Python 3 进程池与回调函数

进程池与回调函数