如何一起使用多处理池和队列?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何一起使用多处理池和队列?相关的知识,希望对你有一定的参考价值。
我需要在超级计算机上执行~18000有点昂贵的计算,我正在试图弄清楚如何并行化代码。我有它主要使用multiprocessing.Process,但如果我做了超过350次计算,它会挂在.join()步骤。
管理超级计算机的计算机科学家之一建议我使用multiprocessing.Pool而不是Process。
使用Process时,我会设置一个输出队列和一个进程列表,然后运行并加入这样的进程:
output = mp.Queue()
processes = [mp.Process(target=some_function,args=(x,output)) for x in some_array]
for p in processes:
p.start()
for p in processes:
p.join()
因为processes
是一个列表,它是可迭代的,我可以在列表理解中使用output.get()
来获得所有结果:
result = [output.get() for p in processes]
使用Pool时,相当于什么?如果Pool不可迭代,我如何获得其中每个进程的输出?
这是我尝试使用虚拟数据和虚拟计算:
import pandas as pd
import multiprocessing as mp
##dummy function
def predict(row,output):
calc = [len(row.c1)**2,len(row.c2)**2]
output.put([row.c1+' - '+row.c2,sum(calc)])
#dummy data
c = pd.DataFrame(data=[['a','bb'],['ccc','dddd'],['ee','fff'],['gg','hhhh'],['i','jjj']],columns=['c1','c2'])
if __name__ == '__main__':
#output queue
print('initializing output container...')
output = mp.Manager().Queue()
#pool of processes
print('initializing and storing calculations...')
pool = mp.Pool(processes=5)
for i,row in c.iterrows(): #try some smaller subsets here
pool.apply_async(predict,args=(row,output))
#run processes and keep a counter-->I'm not sure what replaces this with Pool!
#for p in processes:
# p.start()
##exit completed processes-->or this!
#for p in processes:
# p.join()
#pool.close() #is this right?
#pool.join() #this?
#store each calculation
print('storing output of calculations...')
p = pd.DataFrame([output.get() for p in pool]) ## <-- this is where the code breaks because pool is not iterable
print(p)
我得到的输出是:
initializing output container...
initializing and storing calculations...
storing output of calculations...
Traceback (most recent call last):
File "parallel_test.py", line 37, in <module>
p = pd.DataFrame([output.get() for p in pool]) ## <-- this is where the code breaks because pool is not iterable
TypeError: 'Pool' object is not iterable
我想要的是p
打印和看起来像:
0 1
0 a - bb 5
1 ccc - dddd 25
2 ee - fff 13
3 gg - hhhh 20
4 i - jjj 10
如何从每次计算中获取输出而不是第一次?
答案
即使您将所有有用的结果存储在队列output
中,您也希望通过调用output.get()
来获取结果(output
中存储的次数)(训练示例的数量 - 在您的情况下为len(c)
)。对我来说,如果你更改线路它是有效的:
print('storing output of calculations...')
p = pd.DataFrame([output.get() for p in pool]) ## <-- this is where the code breaks because pool is not iterable
至:
print('storing output of calculations...')
p = pd.DataFrame([output.get() for _ in range(len(c))]) ## <-- no longer breaks
以上是关于如何一起使用多处理池和队列?的主要内容,如果未能解决你的问题,请参考以下文章