并行运行函数并使用队列获取输出
Posted
技术标签:
【中文标题】并行运行函数并使用队列获取输出【英文标题】:Run function in parallel and grab outputs using Queue 【发布时间】:2021-10-31 23:14:29 【问题描述】:我想用不同的参数来取乐function
。对于每个不同的参数,我想并行运行该函数,然后获取每次运行的输出。 multiprocessing
模块似乎可以在这里提供帮助。我不确定完成这项工作的正确步骤。
我是否启动所有进程,然后
get
所有队列,然后按此顺序加入所有进程?还是我get
加入后的结果?还是加入第i个进程后得到第i个结果?
from numpy.random import uniform
from multiprocessing import Process, Queue
def function(x):
return uniform(0.0, x)
if __name__ == "__main__":
queue = Queue()
processes = []
x_values = [1.0, 10.0, 100.0]
# Start all processes
for x in x_values:
process = Process(target=function, args=(x, queue, ))
processes.append(process)
process.start()
# Grab results of the processes?
outputs = [queue.get() for _ in range(len(x_values))]
# Not even sure what this does but apparently it's needed
for process in processes:
process.join()
【问题讨论】:
您能解释一下这些流程在做什么吗?我是他们向你返回了一些价值,还是他们参与了其他事情?个人 id 使用多处理池。另请注意,如果您产生的进程多于您拥有的核心......它实际上并没有做任何事情。对我来说,池比手动启动流程更直观一点,特别是如果您的案例中有很多 x_values。 @JasonChia 感谢您的评论。基本上你可以把我想运行的函数想象成一个实验。我想并行运行实验 100 次并存储输出(在我的实际用例中是字典)。我这样做的原因是我想看看我的实验平均表现如何,但每次实验运行大约需要 1 小时,所以我想并行化它。 @JasonChia 有意义吗?您将如何使用池?如果你能告诉我,你会是我的英雄! 【参考方案1】:因此,让我们为多处理池做一个简单的例子,它有一个加载函数,该函数休眠 3 秒并返回传递给它的值(您的参数)以及函数的结果,它只是将它加倍。 IIRC 干净地停止池存在一些问题
from multiprocessing import Pool
import time
def time_waster(val):
try:
time.sleep(3)
return (val, val*2) #return a tuple here but you can use a dict as well with all your parameters
except KeyboardInterrupt:
raise KeyboardInterruptError()
if __name__ == '__main__':
x = list(range(5)) #values to pass to the function
results = []
try:
with Pool(2) as p: #I use 2 but you can use as many as you have cores
results.append(p.map(time_waster,x))
except KeyboardInterrupt:
p.terminate()
except Exception as e:
p.terminate()
finally:
p.join()
print(results)
作为一项额外的服务,添加了一些键盘中断处理程序作为 IIRC,存在一些中断池的问题。https://***.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
【讨论】:
谢谢!我有一台 8 核的机器,并尝试了长度为5
和长度为 8
(保持 Pool(8)
)的输入 x 的解决方案,但时间完全不同。这告诉我它并没有真正并行地做任何事情
这几乎是因为运行 8 个池中的所有 5 个进程需要 3 秒。加上开销。如果您与在 for 循环中运行进行比较,您可以非常明显地看到它的好处。您可以尝试长度为 16 而不是 5 的 x。您将获得大约 3*2 秒 + 开销。而 for 循环将持续 16*3 秒。【参考方案2】:
proc.join() 阻塞直到进程结束。 queue.get() 阻塞,直到队列中有东西。因为您的进程没有将任何内容放入队列(在此示例中),所以此代码将永远不会超出 queue.get() 部分...如果您的进程在最后将某些内容放入队列中,那么它不会不管你是先加入()还是获取(),因为它们几乎同时发生。
【讨论】:
以上是关于并行运行函数并使用队列获取输出的主要内容,如果未能解决你的问题,请参考以下文章
使用不同的参数并行运行相同的函数,并知道哪个并行运行在 python 中结束了