多处理池是不是为每个进程提供相同数量的任务,或者它们是不是被分配为可用?

Posted

技术标签:

【中文标题】多处理池是不是为每个进程提供相同数量的任务,或者它们是不是被分配为可用?【英文标题】:Do multiprocessing pools give every process the same number of tasks, or are they assigned as available?多处理池是否为每个进程提供相同数量的任务,或者它们是否被分配为可用? 【发布时间】:2012-10-27 04:43:46 【问题描述】:

当您 map 可迭代到 multiprocessing.Pool 时,迭代是否在开始时为池中的每个进程划分为一个队列,或者是否存在一个公共队列,当一个进程空闲时从该队列中获取任务?

    def generate_stuff():
        for foo in range(100):
             yield foo

    def process(moo):
        print moo

    pool = multiprocessing.Pool()
    pool.map(func=process, iterable=generate_stuff())
    pool.close()

因此,鉴于此未经测试的建议代码;如果池中有 4 个进程,每个进程是否被分配了 25 个任务,或者 100 个任务被进程一个接一个地挑选出来,以便每个进程执行不同数量的任务,例如 30 , 26, 24, 20.

【问题讨论】:

这与您的问题无关,但如果您的可迭代对象是生成器或其他惰性类型,您可能希望使用imap 而不是map,并传递一个明确的@ 987654326@参数。 哦,它是相关的,并且适用,因为我不确定chunksize 的默认值是什么map - 指定默认值的省略支持了我在下面的 cmets 中的怀疑 - 它块一开始就对每个过程进行同样的处理。 正如我在回答中提到的,您可以阅读源代码。 map 接受 chunksize=None。然后,在map_async(它使用)中,if chunksize is None 设置chunksize, extra = divmod(len(iterable), len(self.pool) * 4)(然后是if extrachunksize += 1)。所以,如果你有 8 个工人和 100 个工作,chunksize 将是 4。 太棒了;还解释了为什么map 在开始时贯穿整个可迭代对象——它正在寻找len。我看看我是否要去yield,那么无论如何我应该使用imap。谢谢大家! 正如我在下面所说的,这是一种权衡。 map 贯穿整个可迭代对象,这意味着在启动和/或运行内存之前会有延迟(对于 100 个整数来说没什么大不了的,但是对于 1000 个网络蜘蛛结果来说,这可能是不可接受的,更不用说 @987654342 @…)。但它更简单一些,你会得到默认的chunksize,而不必计算/测量/猜测一个。 【参考方案1】:

要估计 Python 实现使用的 chunksize 而无需查看其 multiprocessing 模块源代码,请运行:

#!/usr/bin/env python
import multiprocessing as mp
from itertools import groupby

def work(index):
    mp.get_logger().info(index)
    return index, mp.current_process().name

if __name__ == "__main__":
    import logging
    import sys
    logger = mp.log_to_stderr()

    # process cmdline args
    try:
        sys.argv.remove('--verbose')
    except ValueError:
        pass  # not verbose
    else:
        logger.setLevel(logging.INFO)  # verbose
    nprocesses, nitems = int(sys.argv.pop(1)), int(sys.argv.pop(1))
    # choices: 'map', 'imap', 'imap_unordered'
    map_name = sys.argv[1] if len(sys.argv) > 1 else 'map'
    kwargs = dict(chunksize=int(sys.argv[2])) if len(sys.argv) > 2 else 

    # estimate chunksize used
    max_chunksize = 0
    map_func = getattr(mp.Pool(nprocesses), map_name)
    for _, group in groupby(sorted(map_func(work, range(nitems), **kwargs),
                                   key=lambda x: x[0]),  # sort by index
                            key=lambda x: x[1]):  # group by process name
        max_chunksize = max(max_chunksize, len(list(group)))
    print("%s: max_chunksize %d" % (map_name, max_chunksize))

这表明imapimap_unordered默认使用chunksize=1max_chunksize用于map取决于nprocessesnitem(每个进程的块数不固定)和max_chunksize取决于python版本。如果指定了chunksize 参数,所有*map* 函数都会考虑它。

用法

$ ./estimate_chunksize.py nprocesses nitems [map_name [chunksize]] [--verbose]

查看各个职位的分配情况;指定--verbose参数。

【讨论】:

【参考方案2】:

因此,鉴于此未经测试的建议代码;如果池中有 4 个进程,每个进程是否被分配了 25 个任务,或者 100 个任务被进程一个接一个地挑选出来,以便每个进程执行不同数量的任务,例如 30 , 26, 24, 20.

嗯,显而易见的答案是测试它。

按原样,测试可能不会告诉您太多,因为作业将尽快完成,而且即使池化进程在准备就绪时抢占作业,事情最终也可能会均匀分布。但是有一个简单的方法可以解决这个问题:

import collections
import multiprocessing
import os
import random
import time

def generate_stuff():
    for foo in range(100):
        yield foo

def process(moo):
    #print moo
    time.sleep(random.randint(0, 50) / 10.)
    return os.getpid()

pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)

如果数字是“参差不齐的”,则您知道池中的进程必须在准备就绪时抓取新作业。 (我明确地将chunksize 设置为 1 以确保块不会太大以至于每个块首先只得到一个块。)

当我在 8 核机器上运行它时:

Counter(98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9)

因此,看起来流程正在快速获得新的工作。

因为你专门问了4个工人,我把Pool()改成Pool(4),得到了这个:

Counter(98965: 31, 98962: 24, 98964: 23, 98963: 22)

但是,有一种比测试更好的方法来找出答案:阅读the source。

如您所见,map 只是调用map_async,这会创建一堆批次并将它们放在self._taskqueue 对象(Queue.Queue 实例)上。如果您进一步阅读,此队列不会直接与其他进程共享,但是有一个池管理器线程,每当一个进程完成并返回结果时,它就会从队列中弹出下一个作业并将其提交回进程。

这也是您可以找出map 的默认块大小的方法。上面链接的 2.7 实现表明它只是对 len(iterable) / (len(self._pool) * 4) 进行了四舍五入(比避免小数算术稍微详细一点)——或者,换一种说法,对于每个进程大约 4 个块来说足够大。但是你真的不应该依赖这个;该文档含糊地和间接地暗示它将使用某种启发式方法,但并没有为您提供任何关于那将是什么的保证。因此,如果您真的需要“每个进程大约 4 个块”,请明确计算。更现实地说,如果您需要除默认值之外的任何内容,您可能需要一个特定于域的值,您将要计算出(通过计算、猜测或分析)。

【讨论】:

谢谢队友。至于测试,我不确定如何计算。我在想我必须弄清楚如何共享变量或其他东西。计数过程非常有见地。收盘后是否需要pool.join() 以确保在吐出计数之前完成所有操作? 请记住 map 为每个作业返回一个值并将它们加入一个列表(map_asyncimapimap_unordered 以不同的方式为您提供相同的信息),所以您很少需要进行任何进程间共享来获取这样的信息。 至于join,在这种情况下你不需要它:map 在返回之前阻塞直到它的所有 100 个结果,并且没有其他代码提交作业。但是,是的,如果您想尝试其他方法来获得工作机会,您可能需要它。【参考方案3】:

http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.map

map(func, iterable[, chunksize])

此方法将可迭代对象切成若干块 作为单独的任务提交到进程池。 (近似)大小 可以通过将 chunksize 设置为正数来指定这些块中的哪一个 整数。

我假设一个进程在处理完前一个块后会从队列中提取下一个块。

默认的chunksize 取决于iterable 的长度,因此选择的块数大约是进程数的四倍。 (source)

【讨论】:

我注意到imap 的默认块大小指定为1,我想知道map 的默认值是多少?对于我的应用程序现在正在做的事情,我怀疑它在开始时将地图分成相等的块;但不确定 - 因此是这个问题。 @JohnMee:imap 的默认值为 1 的原因是 imap 不知道 iterable 的长度,因此无法启发式地猜测最佳 chunksize。 (是的,这意味着有一个折衷——有时实际上从iterable 中构建一个list 更快,只是为了获得启发。但通常,无论如何,你可以想出一个更好的chunksize,只需知道问题空间。)

以上是关于多处理池是不是为每个进程提供相同数量的任务,或者它们是不是被分配为可用?的主要内容,如果未能解决你的问题,请参考以下文章

线程池,进程和线程的理解

开启线程池和进程池

Python 多处理:最大。池工作进程的数量?

第三十三天

进程-进程池Pool

进程-进程池Pool