multiprocessing.Pool:map_async 和 imap 有啥区别?

Posted

技术标签:

【中文标题】multiprocessing.Pool:map_async 和 imap 有啥区别?【英文标题】:multiprocessing.Pool: What's the difference between map_async and imap?multiprocessing.Pool:map_async 和 imap 有什么区别? 【发布时间】:2014-12-18 16:33:11 【问题描述】:

我正在尝试学习如何使用 Python 的 multiprocessing 包,但我不明白 map_asyncimap 之间的区别。 我注意到map_asyncimap 都是异步执行的。那么我什么时候应该使用其中一个呢?以及我应该如何检索map_async返回的结果?

我应该使用这样的东西吗?

def test():
    result = pool.map_async()
    pool.close()
    pool.join()
    return result.get()

result=test()
for i in result:
    print i

【问题讨论】:

【参考方案1】:

接受的答案指出,对于imap_unordered,“一旦准备好,就会产生结果”,人们可能会推断出结果将按完成顺序返回。但我只想明确指出,这不是真的一般。文档声明结果以任意顺序返回。考虑以下程序,该程序使用大小为 4 的池,iterable 大小为 20,chunksize 值为 5。worker 函数根据它的传递的参数,这也确保池中没有一个进程抓取所有提交的任务。因此,我希望池中的每个进程都有 20 / 4 = 5 任务来处理:

from multiprocessing import Pool
import time

def worker(x):
    print(f'x = x', flush=True)
    time.sleep(.1 * (20 - x))
    # return approximate completion time with passed argument:
    return time.time(), x

if __name__ == '__main__':
    pool = Pool(4)
    results = pool.imap_unordered(worker, range(20), chunksize=5)
    for t, x in results:
        print('result:', t, x)

打印:

x = 0
x = 5
x = 10
x = 15
x = 16
x = 17
x = 11
x = 18
x = 19
x = 6
result: 1621512513.7737606 15
result: 1621512514.1747007 16
result: 1621512514.4758775 17
result: 1621512514.675989 18
result: 1621512514.7766125 19
x = 12
x = 1
x = 13
x = 7
x = 14
x = 2
result: 1621512514.2716103 10
result: 1621512515.1721854 11
result: 1621512515.9727488 12
result: 1621512516.6744206 13
result: 1621512517.276999 14
x = 8
x = 9
x = 3
result: 1621512514.7695887 5
result: 1621512516.170747 6
result: 1621512517.4713914 7
result: 1621512518.6734042 8
result: 1621512519.7743165 9
x = 4
result: 1621512515.268784 0
result: 1621512517.1698637 1
result: 1621512518.9698756 2
result: 1621512520.671273 3
result: 1621512522.2716706 4

您可以清楚地看到这些结果不是按完成顺序产生的。比如我已经返回了1621512519.7743165 9,后面跟着1621512515.268784 0,这个是worker函数返回的,比之前返回的结果早了4秒多。但是,如果我将 chunksize 值更改为 1,打印输出将变为:

x = 0
x = 1
x = 2
x = 3
x = 4
result: 1621513028.888357 3
x = 5
result: 1621513028.9863524 2
x = 6
result: 1621513029.0838938 1
x = 7
result: 1621513029.1825204 0
x = 8
result: 1621513030.4842813 7
x = 9
result: 1621513030.4852195 6
x = 10
result: 1621513030.4872172 5
x = 11
result: 1621513030.4892178 4
x = 12
result: 1621513031.3908074 11
x = 13
result: 1621513031.4895358 10
x = 14
result: 1621513031.587289 9
x = 15
result: 1621513031.686152 8
x = 16
result: 1621513032.1877549 15
x = 17
result: 1621513032.1896958 14
x = 18
result: 1621513032.1923752 13
x = 19
result: 1621513032.1923752 12
result: 1621513032.2935638 19
result: 1621513032.3927407 18
result: 1621513032.4912949 17
result: 1621513032.5884912 16

这个完成顺序。但是,我不敢说imap_unordered 将始终在结果可用时返回结果如果指定了 1 的 chunksize 值,尽管那根据这个实验,似乎是这种情况,因为文档没有提出这样的要求。

讨论

chunksize 指定为 5 时,这 20 个任务被放置在一个输入队列中,供池中的 4 个进程以大小为 5 的块进行处理。因此,一个空闲的进程将将 5 个任务的下一部分从队列中取出,并在再次空闲之前依次处理每个任务。因此,第一个进程将处理 x 参数 0 到 4,第二个进程 x 参数 5 到 9 等。这就是为什么您看到初始 x 值打印为 0、5、10 和 15。

但是,虽然 x 参数 0 的结果在 x 参数 9 的结果之前完成,但结果似乎作为块一起写出,因此 x 参数 0 的结果将不会返回,直到在同一块(即 1、2、3 和 4)中排队的 x 参数的结果也可用。

【讨论】:

谢谢,这是一个好点。我同意你的观察,它看起来像一个 giben 结果值只有在它所属的整个块完成时才对父级可用。【参考方案2】:

imap/imap_unorderedmap/map_async 之间有两个主要区别:

    他们使用您传递给他们的迭代的方式。 他们将结果返回给您的方式。

map 通过将可迭代对象转换为列表(假设它还不是列表)、将其分成块并将这些块发送到 Pool 中的工作进程来消耗您的可迭代对象。将 iterable 分成块比在进程之间一次传递一个 item 的 iterable 中的每个项目执行得更好 - 特别是在 iterable 很大的情况下。但是,将可迭代对象转换为列表以对其进行分块可能会产生非常高的内存成本,因为整个列表都需要保存在内存中。

imap 不会将您提供的可迭代对象转换为列表,也不会将其分成块(默认情况下)。它将一次迭代一个可迭代的元素,并将它们每个发送到一个工作进程。这意味着您不会将整个可迭代对象转换为列表而对内存造成影响,但这也意味着大型可迭代对象的性能较慢,因为缺少分块。但是,可以通过传递大于默认值 1 的 chunksize 参数来缓解这种情况。

imap/imap_unorderedmap/map_async 之间的另一个主要区别是,使用 imap/imap_unordered,您可以在工作人员准备好后立即开始接收结果,而不是必须等待所有这些都完成。使用map_async,会立即返回一个AsyncResult,但在处理完所有结果之前,您实际上无法从该对象中检索结果,此时它返回与map 相同的列表(map实际上在内部实现为map_async(...).get())。没有办法得到部分结果;你要么得到全部结果,要么什么都没有。

imapimap_unordered 都立即返回可迭代对象。使用imap,一旦准备好,结果就会从迭代中产生,同时仍然保留输入迭代的顺序。使用imap_unordered,无论输入可迭代的顺序如何,只要它们准备好就会产生结果。所以,假设你有这个:

import multiprocessing
import time

def func(x):
    time.sleep(x)
    return x + 2

if __name__ == "__main__":    
    p = multiprocessing.Pool()
    start = time.time()
    for x in p.imap(func, [1,5,3]):
        print(" (Time elapsed: s)".format(x, int(time.time() - start)))

这将输出:

3 (Time elapsed: 1s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

如果您使用p.imap_unordered 而不是p.imap,您会看到:

3 (Time elapsed: 1s)
5 (Time elapsed: 3s)
7 (Time elapsed: 5s)

如果您使用p.mapp.map_async().get(),您会看到:

3 (Time elapsed: 5s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

因此,使用imap/imap_unordered 而不是map_async 的主要原因是:

    您的可迭代对象足够大,将其转换为列表会导致您耗尽/使用太多内存。 您希望能够在所有完成之前开始处理结果。

【讨论】:

apply 和 apply_async 怎么样? @HarshDaftary apply 将单个任务发送到工作进程,然后阻塞直到完成。 apply_async 将单个任务发送到工作进程,然后立即返回一个AsyncResult 对象,该对象可用于等待任务完成并检索结果。 apply 只需调用apply_async(...).get()即可实现 这种描述应该在官方Pool 文档中,而不是the existing dull one。 @BallpointBen 完成后,它将继续进行下一项工作。订单在父进程中处理。 如果您根本不关心返回结果,例如,将处理结果写入磁盘供以后使用,会发生什么?

以上是关于multiprocessing.Pool:map_async 和 imap 有啥区别?的主要内容,如果未能解决你的问题,请参考以下文章

multiprocessing.Pool:map_async 和 imap 有啥区别?

multiprocessing.Pool.map_async() 的结果是不是以与输入相同的顺序返回?

pool.map()不识别主线程中声明的变量

python multiprocessing pool.map() 等到方法完成

如何从multiprocessing.Pool.map的worker_funtion内部为数组赋值?

multiprocessing.Pool.map引发MemoryError