如何从 pool.apply_async 调用中累积结果?
Posted
技术标签:
【中文标题】如何从 pool.apply_async 调用中累积结果?【英文标题】:How to accumulate results from pool.apply_async call? 【发布时间】:2020-02-13 12:45:16 【问题描述】:我想调用 pool.apply_async(func) 并在结果可用时立即累积结果,而无需相互等待。
import multiprocessing
import numpy as np
chrNames=['chr1','chr2','chr3']
sims=[1,2,3]
def accumulate_chrBased_simBased_result(chrBased_simBased_result,accumulatedSignalArray,accumulatedCountArray):
signalArray = chrBased_simBased_result[0]
countArray = chrBased_simBased_result[1]
accumulatedSignalArray += signalArray
accumulatedCountArray += countArray
def func(chrName,simNum):
print('%s %d' %(chrName,simNum))
result=[]
signal_array=np.full((10000,), simNum, dtype=float)
count_array = np.full((10000,), simNum, dtype=int)
result.append(signal_array)
result.append(count_array)
return result
if __name__ == '__main__':
accumulatedSignalArray = np.zeros((10000,), dtype=float)
accumulatedCountArray = np.zeros((10000,), dtype=int)
numofProcesses = multiprocessing.cpu_count()
pool = multiprocessing.Pool(numofProcesses)
for chrName in chrNames:
for simNum in sims:
result= pool.apply_async(func, (chrName,simNum,))
accumulate_chrBased_simBased_result(result.get(),accumulatedSignalArray,accumulatedCountArray)
pool.close()
pool.join()
print(accumulatedSignalArray)
print(accumulatedCountArray)
这样,每个 pool.apply_async 调用都会等待其他调用结束。 有没有办法摆脱这种相互等待?
【问题讨论】:
【参考方案1】:您在每次迭代中都使用result.get()
,并让主进程等待函数准备就绪。
请在下面找到一个工作版本,打印显示当“func”准备好时累积已完成,并添加随机睡眠以确保相当大的执行时间差异。
import multiprocessing
import numpy as np
from time import time, sleep
from random import random
chrNames=['chr1','chr2','chr3']
sims=[1,2,3]
def accumulate_chrBased_simBased_result(chrBased_simBased_result,accumulatedSignalArray,accumulatedCountArray):
signalArray = chrBased_simBased_result[0]
countArray = chrBased_simBased_result[1]
accumulatedSignalArray += signalArray
accumulatedCountArray += countArray
def func(chrName,simNum):
result=[]
sleep(random()*5)
signal_array=np.full((10000,), simNum, dtype=float)
count_array = np.full((10000,), simNum, dtype=int)
result.append(signal_array)
result.append(count_array)
print('%s %d' %(chrName,simNum))
return result
if __name__ == '__main__':
accumulatedSignalArray = np.zeros((10000,), dtype=float)
accumulatedCountArray = np.zeros((10000,), dtype=int)
numofProcesses = multiprocessing.cpu_count()
pool = multiprocessing.Pool(numofProcesses)
results = []
for chrName in chrNames:
for simNum in sims:
results.append(pool.apply_async(func, (chrName,simNum,)))
for i in results:
print(i)
while results:
for r in results[:]:
if r.ready():
print(' is ready'.format(r))
accumulate_chrBased_simBased_result(r.get(),accumulatedSignalArray,accumulatedCountArray)
results.remove(r)
pool.close()
pool.join()
print(accumulatedSignalArray)
print(accumulatedCountArray)
【讨论】:
非常感谢。我唯一担心的是当我们有很多 chrNames 和 sims 时结果的内存使用情况,那么结果会占用很多空间吗? 有没有办法尽可能累积 pool.apply_async 调用的结果而不将它们收集在类似结构的列表中?因为这可能会减少内存占用。 我认为你不会遇到这个问题,因为 apply_async 实际上返回:class multiprocessing.pool.AsyncResult
。您需要在 AsyncResult 上调用 get() 才能实际获取值。以上是关于如何从 pool.apply_async 调用中累积结果?的主要内容,如果未能解决你的问题,请参考以下文章
如何将 dict 参数传递给 Python Pool.apply_async 方法?
python multiprocessing.pool.apply_async 占用内存多 解决方法