如何限制并发worker的数量?
Posted
技术标签:
【中文标题】如何限制并发worker的数量?【英文标题】:How to limit the number of concurrent workers? 【发布时间】:2015-12-06 11:38:28 【问题描述】:我有一个函数,我想并行执行多次,但同时只有定义数量的实例。
这样做的自然方法似乎是使用multiprocessing.Pool
。具体来说,文档说
频繁模式 (...) 是允许一个工人在一个 池在退出之前只完成一定数量的工作,被 清理并产生了一个新进程来替换旧进程。这 池的
maxtasksperchild
参数将这种能力暴露到最后 用户。
maxtasksperchild
定义为:
maxtasksperchild
是工作进程可以执行的任务数 在它退出并被新的工人取代之前完成 进程,以释放未使用的资源。默认 maxtasksperchild 为 None,这意味着工作进程将作为 和游泳池一样长。
我不清楚 task 在这里是什么意思。例如,如果我只想让我的工作人员最多并行运行 4 个实例,我应该将multiprocessing.Pool
启动为
pool = multiprocessing.Pool(processes=4, maxtasksperchild=4)
processes
和 maxtasksperchild
如何协同工作?我可以将processes
设置为 10 并且仍然只有 4 个工作人员在运行(实际上有 6 个进程空闲吗?)
【问题讨论】:
当您执行p.map(f, s)
其中p
是Pool
时,序列s
的每个元素都算作一项任务。 p.apply(f)
算作一项任务。
你把maxtasksperchild
的意思全弄错了。但是,为什么您要保留大约 6 个空闲进程?这比只创建 4 个执行实际工作的进程有什么好处?
@DanD。谢谢 - 一旦完成另一项任务,任何进程(在我的情况下为 4 个)是否会使用这些任务? (他们排队等待处理?)
@shx2:你完全理解了 maxtasksperchild 的含义——可能,但这不是很有帮助。至于第二部分 - 我使用 example 来尝试理解两个参数之间的关系,这显然不是真正的 prod 代码。
是的,每个进程在完成当前任务后会获得另一项任务(如果有一个任务可用)。 Pool 内部是一个任务队列。
【参考方案1】:
正如doc 所说(也在你的描述中),
processes是可以一起运行的parallel worker的数量,如果不设置,它将与您计算机中的CPU数量相同。
maxtasksperchild 是每个进程可以处理的最大任务数,这意味着如果完成的任务数达到 maxtasksperchild,则该进程将被杀死并启动一个新进程并添加到池中
让我检查一下代码:
def f(x):
print "pid: ", os.getpid(), " deal with ", x
sys.stdout.flush()
if __name__ == '__main__':
pool = Pool(processes=4, maxtasksperchild=2)
keys = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
result = pool.map(f, keys)
这里我们使用 4 个进程,每个进程在执行 2 个任务后都会被杀死。代码执行后,可以看到:
pid: 10899 deal with 1
pid: 10900 deal with 2
pid: 10901 deal with 3
pid: 10899 deal with 5
pid: 10900 deal with 6
pid: 10901 deal with 7
pid: 10902 deal with 4
pid: 10902 deal with 8
pid: 10907 deal with 9
pid: 10907 deal with 10
进程 [10899-10902] 在每个进程执行 2 个任务后被杀死,并使用新进程 10907 执行最后一个。
作为比较,如果我们使用更大的 maxtasksperchild 或默认值(这意味着进程永远不会被杀死并且只要 Pool 就活着),如下代码:
if __name__ == '__main__':
pool = Pool(processes=4, maxtasksperchild=10)
keys = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
result = pool.map(f, keys)
结果:
pid: 13352 deal with 1
pid: 13353 deal with 2
pid: 13352 deal with 4
pid: 13354 deal with 3
pid: 13353 deal with 6
pid: 13352 deal with 7
pid: 13355 deal with 5
pid: 13354 deal with 8
pid: 13353 deal with 9
pid: 13355 deal with 10
如您所见,没有创建新流程,所有任务都使用原来的 4 个流程完成。
希望这个有用~
【讨论】:
是的,谢谢。我只是想确保(也来自@DanD. 的评论)任务已排队,等待一个免费进程来接他们。只有processes
数量的此类进程处于活动状态。 (?°
我不确定源代码,但我猜任务在类似 multiprocessing.Queue 的结构中排队,这是一种多生产者 - 消费者模式,因此应该在多个进程之间共享队列。我认为只要池管理它们,进程就会活着~以上是关于如何限制并发worker的数量?的主要内容,如果未能解决你的问题,请参考以下文章