如何限制并发worker的数量?

Posted

技术标签:

【中文标题】如何限制并发worker的数量?【英文标题】:How to limit the number of concurrent workers? 【发布时间】:2015-12-06 11:38:28 【问题描述】:

我有一个函数,我想并行执行多次,但同时只有定义数量的实例。

这样做的自然方法似乎是使用multiprocessing.Pool。具体来说,文档说

频繁模式 (...) 是允许一个工人在一个 池在退出之前只完成一定数量的工作,被 清理并产生了一个新进程来替换旧进程。这 池的maxtasksperchild 参数将这种能力暴露到最后 用户。

maxtasksperchild 定义为:

maxtasksperchild 是工作进程可以执行的任务数 在它退出并被新的工人取代之前完成 进程,以释放未使用的资源。默认 maxtasksperchild 为 None,这意味着工作进程将作为 和游泳池一样长。

我不清楚 task 在这里是什么意思。例如,如果我只想让我的工作人员最多并行运行 4 个实例,我应该将multiprocessing.Pool 启动为

pool = multiprocessing.Pool(processes=4, maxtasksperchild=4)

processesmaxtasksperchild 如何协同工作?我可以将processes 设置为 10 并且仍然只有 4 个工作人员在运行(实际上有 6 个进程空闲吗?)

【问题讨论】:

当您执行p.map(f, s) 其中pPool 时,序列s 的每个元素都算作一项任务。 p.apply(f) 算作一项任务。 你把maxtasksperchild的意思全弄错了。但是,为什么您要保留大约 6 个空闲进程?这比只创建 4 个执行实际工作的进程有什么好处? @DanD。谢谢 - 一旦完成另一项任务,任何进程(在我的情况下为 4 个)是否会使用这些任务? (他们排队等待处理?) @shx2:你完全理解了 maxtasksperchild 的含义——可能,但这不是很有帮助。至于第二部分 - 我使用 example 来尝试理解两个参数之间的关系,这显然不是真正的 prod 代码。 是的,每个进程在完成当前任务后会获得另一项任务(如果有一个任务可用)。 Pool 内部是一个任务队列。 【参考方案1】:

正如doc 所说(也在你的描述中),

processes是可以一起运行的parallel worker的数量,如果不设置,它将与您计算机中的CPU数量相同。

maxtasksperchild 是每个进程可以处理的最大任务数,这意味着如果完成的任务数达到 maxtasksperchild,则该进程将被杀死并启动一个新进程并添加到池中

让我检查一下代码:

def f(x):
    print "pid: ", os.getpid(), " deal with ", x
    sys.stdout.flush()

if __name__ == '__main__':
    pool = Pool(processes=4, maxtasksperchild=2)
    keys = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    result = pool.map(f, keys)

这里我们使用 4 个进程,每个进程在执行 2 个任务后都会被杀死。代码执行后,可以看到:

pid:  10899  deal with  1
pid:  10900  deal with  2
pid:  10901  deal with  3
pid:  10899  deal with  5
pid:  10900  deal with  6
pid:  10901  deal with  7
pid:  10902  deal with  4
pid:  10902  deal with  8
pid:  10907  deal with  9
pid:  10907  deal with  10

进程 [10899-10902] 在每个进程执行 2 个任务后被杀死,并使用新进程 10907 执行最后一个。

作为比较,如果我们使用更大的 maxtasksperchild 或默认值(这意味着进程永远不会被杀死并且只要 Pool 就活着),如下代码:

if __name__ == '__main__':
    pool = Pool(processes=4, maxtasksperchild=10)
    keys = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    result = pool.map(f, keys)

结果:

pid:  13352  deal with  1
pid:  13353  deal with  2
pid:  13352  deal with  4
pid:  13354  deal with  3
pid:  13353  deal with  6
pid:  13352  deal with  7
pid:  13355  deal with  5
pid:  13354  deal with  8
pid:  13353  deal with  9
pid:  13355  deal with  10

如您所见,没有创建新流程,所有任务都使用原来的 4 个流程完成。

希望这个有用~

【讨论】:

是的,谢谢。我只是想确保(也来自@DanD. 的评论)任务已排队,等待一个免费进程来接他们。只有processes 数量的此类进程处于活动状态。 (?° 我不确定源代码,但我猜任务在类似 multiprocessing.Queue 的结构中排队,这是一种多生产者 - 消费者模式,因此应该在多个进程之间共享队列。我认为只要池管理它们,进程就会活着~

以上是关于如何限制并发worker的数量?的主要内容,如果未能解决你的问题,请参考以下文章

如何限制并发异步 I/O 操作的数量?

如何限制由 Pig 脚本启动的并发作业数量?

如何在 asyncio python 中使用 subprocess 模块限制并发进程的数量

nginx优化

如何限制并发的 异步IO 请求数量?

怎么优化设置apache的并发数量