如何并行运行 PHP 对象方法并将结果同步到数组中

Posted

技术标签:

【中文标题】如何并行运行 PHP 对象方法并将结果同步到数组中【英文标题】:How to run PHP object methods in parallel and sync the results into array 【发布时间】:2019-04-12 06:47:57 【问题描述】:

您好,正在尝试找到一种并行运行 php 对象方法的方法。

浏览了几个关于 PHP 多线程的解决方案,但似乎找不到并行运行对象方法的方法,有人可以解释我做错了什么并建议修复任何解决方案或替代示例使用 Country 类,其中 get_data 方法将在多个并行进程中运行?

    pcntl_fork() - 使用 PHP 分叉 Pthreads - PHP 扩展 misterion/ko-process - 作曲家包 duncan3dc/fork-helper - 作曲家包 illuminate/queue - 作曲家包

测试 pcntl_fork()

    <?php

    class Countries 
        function get_data($country)

            usleep(1000);
            foreach($i=0; $i<1000;$i++ )
                $data[$i] = $country;
            

            return $data;

        
    

    $os = new Countries;

    $countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");



    // how to add and control a limit of max processes running at the time?

    $start_time = microtime(true);

    foreach($countries as $country) 

        $pid = pcntl_fork();

        if (!$pid) 

            error_log( date('Y-m-d H:i:s').' - In child  '.$country." \n", 3, $log);

            // How to execute $os->get_table_data($country) method in a child process and collect results into $d[$country]?
            $d[$country] = $os->get_data($country);

            error_log( date('Y-m-d H:i:s').' - !pid -> d['.$country.']  ='.var_export($d[$country],true)." \n", 3, $log);
            exit($country);
        
    

    while (pcntl_waitpid(0, $status) != -1);
    // do something with $d[$country] here after all child processes completed


    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration,3);
    error_log( date('Y-m-d H:i:s').' - 1. pcntl_fork() example duration='.$duration."\n", 3, $log);





?>

测试 Pthreads

<?php


if (extension_loaded('pthreads')) 

    $pool = new Pool(4);

    class Countries 
        function get_data($country)

            usleep(1000);
            foreach($i=0; $i<1000;$i++ )
                $data[$i] = $country;
            

            return $data;

        
    

    $os = new Countries;

    $countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");


    $start_time = microtime(true);
    foreach($countries as $country)    
        $dataN = new Threaded();
        $dataN->country = $country;
        $dataN->os = $os;
        $dataN->result = "";

        $threads[] = $dataN;

        $pool->submit(
            new class($dataN) extends Threaded 
                public $data;

                public function __construct($data)
                
                    $this->data = $data;
                

                public function run()
                

                    $this->data->result = $this->data->os->get_data($this->data->country);

                
            
        );

    


    while ($pool->collect());

    $pool->shutdown();

    foreach ($threads as $thread) 

        error_log( date('Y-m-d H:i:s').' - d['.$thread->country.'] = '.var_export($thread->result,true)."\n", 3, $log);
        $d[$thread->country] = $thread->result;

    

    // do something with $d[$country] here after all child processes completed

    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration,3);
    error_log( date('Y-m-d H:i:s').' - 2. PHP PThreads example duration='.$duration."\n", 3, $log);
else
    error_log( date('Y-m-d H:i:s').' - pthreads extension is not loaded!'."\n", 3, $log);

   

?>

测试 misterion/ko-process

<?php

require_once $_SERVER["DOCUMENT_ROOT"].'/vendor/autoload.php';

    class Countries 
        function get_data($country)

            usleep(1000);
            foreach($i=0; $i<1000;$i++ )
                $data[$i] = $country;
            

            return $data;

        
    

    $os = new Countries;

    $countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");



    // how to add and control a limit of max processes running at the time?

    $start_time = microtime(true);



    $manager = new Ko\ProcessManager();

    foreach($countries as $country) 

        $manager->fork(function(Ko\Process $p) 
            error_log( date('Y-m-d H:i:s').' - In child  '.$country." \n", 3, $log);
            // How to execute $os->get_table_data($country) method in a child process and collect results into $d[$country]?
            $d[$country] = $os->get_data($country);
        );

    

    error_log( date('Y-m-d H:i:s')." - Waiting for the threads to finish... \n", 3, $log);  
    $manager->wait();

    error_log( date('Y-m-d H:i:s')." - threads finished. \n", 3, $log); 

    // do something with $d[$country] here after all child processes completed


    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration,3);
    error_log( date('Y-m-d H:i:s').' - 3. misterion/ko-process example duration='.$duration."\n", 3, $log);



?>

测试 duncan3dc/fork-helper

<?php

require_once $_SERVER["DOCUMENT_ROOT"].'/vendor/autoload.php';

    class Countries 
        function get_data($country)

            usleep(1000);
            foreach($i=0; $i<1000;$i++ )
                $data[$i] = $country;
            

            return $data;

        
    

    $os = new Countries;

    $countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");



    // how to add and control a limit of max processes running at the time?

    $start_time = microtime(true);


    $fork = new \duncan3dc\Forker\Fork;

    foreach($countries as $country) 

        $fork->call(function () 
            error_log( date('Y-m-d H:i:s').' - In child  '.$country." \n", 3, $log);
            // How to execute $os->get_table_data($country) method in a child process and collect results into $d[$country]?
            $d[$country] = $os->get_data($country);

        );


    

    error_log( date('Y-m-d H:i:s')." - Waiting for the threads to finish... \n", 3, $log);  

    $fork->wait();
    error_log( date('Y-m-d H:i:s')." - threads finished. \n", 3, $log); 

    // do something with $d[$country] here after all child processes completed


    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration,3);
    error_log( date('Y-m-d H:i:s').' - 3. duncan3dc/fork-helper example duration='.$duration."\n", 3, $log);





?>

测试照明/排队

<?php

require_once $_SERVER["DOCUMENT_ROOT"].'/vendor/autoload.php';


    class Countries 

        public $data;

        function __construct($country)
                $this->data[$country] = $this->get_data($country);
        

        function get_data($country)

            usleep(1000);
            foreach($i=0; $i<1000;$i++ )
                $data[$i] = $country;
            

            return $data;

        
    

    $os = new Countries;

    $countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");

    use Illuminate\Queue\Capsule\Manager as Queue;

    $queue = new Queue;

    $queue->addConnection([
        'driver' => 'beanstalkd',
        'host' => 'localhost',
        'queue' => 'default',
    ]);

    // Make this Capsule instance available globally via static methods... (optional)
    //$queue->setAsGlobal();


    // how to add and control a limit of max processes running at the same time?
    foreach($countries as $country) 
        $d[$country] = $queue->push('Countries', array("country"=>$country));
    
    // how to get results after all processes completed into $d[$country]?
    // do something with results


    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration,3);
    error_log( date('Y-m-d H:i:s').' - 1. pcntl_fork() example duration='.$duration."\n", 3, $log);


?>              

【问题讨论】:

【参考方案1】:

我无法使用 pthreads、ko-process、fork-helper 或队列(我根本没有使用它们的经验),但这是让您的代码使用 pcntl_fork 和使用套接字的一种方法在子进程和父进程之间传递消息:

<?php
    class Countries 
        function get_data($country)
            usleep(1000);
            for($i=0; $i<1000; $i++)
                $data[$i] = $country;
            

            return $data;
        
    

    $os = new Countries;

    $countries = ["GB", "US", "FR", "DE", "IT", "ES", "LT", "BR", "BE", "JP", "CN"];

    // To answer your question about limiting the number of concurrent processes, you
    // need to limit the number of times you call pctnl_fork(). You might do something
    // like:
    //    1. Chunk the $countries array: [["GB", "US"], ["FR", "DE"], ["IT", "ES"], ...
    //    2. Call pctnl_fork() once for each inner array (half as many)
    //    3. Child process calls $os->get_data() once for each country in the sub-array
    //
    // Another solution is to utilize what's known as a "Pool" -- where you give a
    // collection of tasks to a class which spins up threads for you and hands tasks to
    // threads as they become available. This method abstracts the complexity of
    // multiprocessing, but will require you to find a third-party library you like or
    // implement the Pool class on your own.
    $start_time = microtime(true);

    // Initialize $d in the parent thread (before pcntl_fork())
    $d = [];

    // Keep a list of child processes, so that we can wait for ALL of them to terminate
    $pids = [];

    // Initialize a socket for message passing (see below)
    socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $socket);

    foreach($countries as $country) 
        $pid = pcntl_fork();

        if (!$pid) 
            error_log( date('Y-m-d H:i:s').' - In child  '.$country." \n", 3, $log);

            // To answer your question about how to collect the result in the $d array,
            // you need to pass the results back to the parent thread via some message
            // channel. The easiest solution I know of is a socket pair.
            //
            // In order for the socket to be available to both the parent and child,
            // the socket must be created before you fork (see above)
            $data = serialize($os->get_data($country));

            // Sockets are just raw byte streams with no organization or semantics. It's
            // up to you to understand the output of the socket. I devised a basic
            // protocol here where I begin with the country code, follow it up with a
            // serialized data structure, then terminate with a double-new-line
            socket_write($socket[0], $country . " " . $data . "\n\n");
            socket_close($socket[0]);
            exit();
        

        $pids[] = $pid;
    

    // Wait for all child processes to finish
    foreach($pids as $pid) 
        pcntl_waitpid($pid, $status);
    

    // Keep reading from the socket until there's no data left
    $new_data = socket_read($socket[1], 1024);
    $data = $new_data;
    while(strlen($new_data) == 1024) 
        $new_data = socket_read($socket[1], 1024);
        $data .= $new_data;
    

    // Split at double-new-line to get individual return values
    $results = explode("\n\n", $data);

    // Now parse the results (per my custom protocol I defined above)
    foreach($results as $result) 
        $country = substr($result, 0, 2);
        $result = substr($result, 3);
        $d[$country] = unserialize($result);
    

    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration, 3);

    error_log( date('Y-m-d H:i:s').' - 1. pcntl_fork() example duration='.$duration."\n", 3, $log);

?>

我确实想说明一件事:很多时候,多处理并不像人们想象的那样神奇地使程序运行得更快。如果一个任务是 CPU 密集型的(也就是说,你把所有的时间都花在执行复杂的 CPU 操作上),那么多处理要么没有效果,要么让它变慢。如果一项任务是 IO 密集型的(也就是说,您将所有时间都花在等待网络或磁盘操作完成),那么您可以通过允许处理器执行有意义的工作而不是坐在它的手上等待来显着加快它的速度。

【讨论】:

感谢您的帮助,我将尝试在更复杂的示例中使用它。你的例子似乎确实有效。还有一件事你有任何第三方图书馆偏好池吗? @anonymous007 不幸的是,我还没有在 PHP 中进行任何池化工作。对于 Python,我使用 multiprocess,对于 Rust,我一直在研究 tokio。尽管进行了快速的 Google 搜索,但看起来 pthreads 内置了对线程池的支持:secure.php.net/manual/en/class.pool.php

以上是关于如何并行运行 PHP 对象方法并将结果同步到数组中的主要内容,如果未能解决你的问题,请参考以下文章

PHP多进程处理并行处理任务实例(转,备用)

如何在并行进程(python)中将项目附加到列表中?

如何通过实体框架从 C# 中的同步代码并行运行查询

PHP多进程处理并行处理任务实例

如何在异步调用期间填充数组并将其发送到响应对象中

Synchronized 同步方法的八种使用场景