如何从 pool.apply_async 调用中累积结果?

Posted

技术标签:

【中文标题】如何从 pool.apply_async 调用中累积结果?【英文标题】:How to accumulate results from pool.apply_async call? 【发布时间】:2020-02-13 12:45:16 【问题描述】:

我想调用 pool.apply_async(func) 并在结果可用时立即累积结果,而无需相互等待。


import multiprocessing
import numpy as np

chrNames=['chr1','chr2','chr3']
sims=[1,2,3]



def accumulate_chrBased_simBased_result(chrBased_simBased_result,accumulatedSignalArray,accumulatedCountArray):
    signalArray = chrBased_simBased_result[0]
    countArray = chrBased_simBased_result[1]

    accumulatedSignalArray += signalArray
    accumulatedCountArray += countArray


def func(chrName,simNum):
    print('%s %d' %(chrName,simNum))

    result=[]
    signal_array=np.full((10000,), simNum, dtype=float)
    count_array = np.full((10000,), simNum, dtype=int)
    result.append(signal_array)
    result.append(count_array)

    return result


if __name__ == '__main__':

    accumulatedSignalArray = np.zeros((10000,), dtype=float)
    accumulatedCountArray = np.zeros((10000,), dtype=int)

    numofProcesses = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(numofProcesses)

    for chrName in chrNames:
        for simNum in sims:
            result= pool.apply_async(func, (chrName,simNum,))
            accumulate_chrBased_simBased_result(result.get(),accumulatedSignalArray,accumulatedCountArray)

    pool.close()
    pool.join()

    print(accumulatedSignalArray)
    print(accumulatedCountArray)



这样,每个 pool.apply_async 调用都会等待其他调用结束。 有没有办法摆脱这种相互等待?

【问题讨论】:

【参考方案1】:

您在每次迭代中都使用result.get(),并让主进程等待函数准备就绪。

请在下面找到一个工作版本,打印显示当“func”准备好时累积已完成,并添加随机睡眠以确保相当大的执行时间差异。

import multiprocessing
import numpy as np
from time import time, sleep
from random import random

chrNames=['chr1','chr2','chr3']
sims=[1,2,3]



def accumulate_chrBased_simBased_result(chrBased_simBased_result,accumulatedSignalArray,accumulatedCountArray):    
    signalArray = chrBased_simBased_result[0]
    countArray = chrBased_simBased_result[1]

    accumulatedSignalArray += signalArray
    accumulatedCountArray += countArray


def func(chrName,simNum):

    result=[]
    sleep(random()*5)
    signal_array=np.full((10000,), simNum, dtype=float)
    count_array = np.full((10000,), simNum, dtype=int)
    result.append(signal_array)
    result.append(count_array)
    print('%s %d' %(chrName,simNum))

    return result


if __name__ == '__main__':

    accumulatedSignalArray = np.zeros((10000,), dtype=float)
    accumulatedCountArray = np.zeros((10000,), dtype=int)

    numofProcesses = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(numofProcesses)

    results = []
    for chrName in chrNames:
        for simNum in sims:
            results.append(pool.apply_async(func, (chrName,simNum,)))

    for i in results:
        print(i)

    while results:
        for r in results[:]:
            if r.ready():
                print(' is ready'.format(r))
                accumulate_chrBased_simBased_result(r.get(),accumulatedSignalArray,accumulatedCountArray)
                results.remove(r)

    pool.close()
    pool.join()

    print(accumulatedSignalArray)
    print(accumulatedCountArray)

【讨论】:

非常感谢。我唯一担心的是当我们有很多 chrNames 和 sims 时结果的内存使用情况,那么结果会占用很多空间吗? 有没有办法尽可能累积 pool.apply_async 调用的结果而不将它们收集在类似结构的列表中?因为这可能会减少内存占用。 我认为你不会遇到这个问题,因为 apply_async 实际上返回:class multiprocessing.pool.AsyncResult。您需要在 AsyncResult 上调用 get() 才能实际获取值。

以上是关于如何从 pool.apply_async 调用中累积结果?的主要内容,如果未能解决你的问题,请参考以下文章

如何将 dict 参数传递给 Python Pool.apply_async 方法?

pool.apply_async 和全局变量

python multiprocessing.pool.apply_async 占用内存多 解决方法

Pool.apply_async():嵌套函数未执行

Python 3.6.8 - multiprocessing.Pool.apply_async() 不工作

Pool.apply_async()。get()导致_thread.lock pickle错误