并行运行函数并使用队列获取输出

Posted

技术标签:

【中文标题】并行运行函数并使用队列获取输出【英文标题】:Run function in parallel and grab outputs using Queue 【发布时间】:2021-10-31 23:14:29 【问题描述】:

我想用不同的参数来取乐function。对于每个不同的参数,我想并行运行该函数,然后获取每次运行的输出multiprocessing 模块似乎可以在这里提供帮助。我不确定完成这项工作的正确步骤。

我是否启动所有进程,然后get 所有队列,然后按此顺序加入所有进程?还是我get加入后的结果?还是加入第i个进程后得到第i个结果?

from numpy.random import uniform
from multiprocessing import Process, Queue

def function(x):
    return uniform(0.0, x)

if __name__ == "__main__":
    queue = Queue()
    processes = []
    x_values = [1.0, 10.0, 100.0]
    
    # Start all processes
    for x in x_values:
        process = Process(target=function, args=(x, queue, ))
        processes.append(process)
        process.start()

    # Grab results of the processes?
    outputs = [queue.get() for _ in range(len(x_values))]
    
    # Not even sure what this does but apparently it's needed
    for process in processes:
        process.join()

【问题讨论】:

您能解释一下这些流程在做什么吗?我是他们向你返回了一些价值,还是他们参与了其他事情?个人 id 使用多处理池。另请注意,如果您产生的进程多于您拥有的核心......它实际上并没有做任何事情。对我来说,池比手动启动流程更直观一点,特别是如果您的案例中有很多 x_values。 @JasonChia 感谢您的评论。基本上你可以把我想运行的函数想象成一个实验。我想并行运行实验 100 次并存储输出(在我的实际用例中是字典)。我这样做的原因是我想看看我的实验平均表现如何,但每次实验运行大约需要 1 小时,所以我想并行化它。 @JasonChia 有意义吗?您将如何使用池?如果你能告诉我,你会是我的英雄! 【参考方案1】:

因此,让我们为多处理池做一个简单的例子,它有一个加载函数,该函数休眠 3 秒并返回传递给它的值(您的参数)以及函数的结果,它只是将它加倍。 IIRC 干净地停止池存在一些问题

from multiprocessing import Pool
import time

def time_waster(val):
    try:
        time.sleep(3)
    
        return (val, val*2) #return a tuple here but you can use a dict as well with all your parameters
    except KeyboardInterrupt:
        raise KeyboardInterruptError()
        
if __name__ == '__main__':
    x = list(range(5)) #values to pass to the function
    results = []
    try:
        with Pool(2) as p: #I use 2 but you can use as many as you have cores
            results.append(p.map(time_waster,x))
    except KeyboardInterrupt:
        p.terminate()
    except Exception as e:
        p.terminate()
    finally:
        p.join()
    print(results)

作为一项额外的服务,添加了一些键盘中断处理程序作为 IIRC,存在一些中断池的问题。https://***.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

【讨论】:

谢谢!我有一台 8 核的机器,并尝试了长度为 5 和长度为 8(保持 Pool(8))的输入 x 的解决方案,但时间完全不同。这告诉我它并没有真正并行地做任何事情 这几乎是因为运行 8 个池中的所有 5 个进程需要 3 秒。加上开销。如果您与在 for 循环中运行进行比较,您可以非常明显地看到它的好处。您可以尝试长度为 16 而不是 5 的 x。您将获得大约 3*2 秒 + 开销。而 for 循环将持续 16*3 秒。【参考方案2】:

proc.join() 阻塞直到进程结束。 queue.get() 阻塞,直到队列中有东西。因为您的进程没有将任何内容放入队列(在此示例中),所以此代码将永远不会超出 queue.get() 部分...如果您的进程在最后将某些内容放入队列中,那么它不会不管你是先加入()还是获取(),因为它们几乎同时发生。

【讨论】:

以上是关于并行运行函数并使用队列获取输出的主要内容,如果未能解决你的问题,请参考以下文章

Delayed_job - 多个并行队列?

使用不同的参数并行运行相同的函数,并知道哪个并行运行在 python 中结束了

如何一起使用多处理池和队列?

GCD使用 串行并行队列 与 同步异步执行的各种组合 及要点分析

一个非常简单的多线程并行 URL 获取(无队列)

如何安排一对不同的功能,以便真正并行运行?