如何根据工作人员结果将额外任务添加到正在运行的多处理池中?

Posted

技术标签:

【中文标题】如何根据工作人员结果将额外任务添加到正在运行的多处理池中?【英文标题】:How to add extra tasks to a running multiprocessing pool, based on worker results? 【发布时间】:2019-06-18 11:52:11 【问题描述】:

假设我有一组 20 个 CPU 繁重任务(每个约 1 小时,但有些需要更长的时间),这些任务通过调用函数来运行,例如Pool.apply_async(function, task_list) 等 PC 有 12 个核心。所以我可以分散负载,并使用所有 12 个内核。

每个任务的结果可能要求必须运行一个新任务(一些任务可能需要 1 次新的运行,其他可能需要 10 次)。

当需要新任务时,我想将该任务生成到现有池 task_list 中,以随时充分优化 CPU 使用率。

目前我运行 20 个任务,等待完成,开始新的 ~18 个任务,等待完成,开始剩余的新任务,....,虽然它发生时只有一个核心被使用了一个小时,而不是12。这加起来计算时间损失了几个小时到几天。我可以在同一个工作人员中运行更新后的任务,但这会导致更大的损失)

使用池似乎无法在池启动时向池中添加更多任务。这是正确的,还是有一些我在到处搜索时错过的聪明方法?

(我看到的唯一选择是使用process而不是pool,并在动态列表上进行while循环,将典型任务作为单个进程启动,同时最多允许12个进程同时运行时间,对于每个任务或新任务,将它们放入动态列表中,并在将任务发送到进程时将其删除。)

【问题讨论】:

apply_async 用于单个函数调用作业并且正在使用一个进程,您的意思是map_async 吗?您可以所有任务都使用异步方法完成之前将新作业发送到现有池中。当结果准备好时,它们还提供注册回调函数。对于您的情况,更简单的方法是将完成任务所需的所有内容放入一个函数中(跳过重新提交另一个任务)并使用pool.mapchunksize=1。与 Pool 的 chunksize 高度相关的背景,您可以找到 here。 谢谢,确实应该是一个 map 选项,chunksize 肯定需要为 1。将任务保持在同一个函数中会产生最后启动的工作函数运行 10 小时的风险,而其他的则是闲置的。您的建议 task_list 可以结合@asafpr 的回答帮助我理解“队列()”功能,所以目前我的猜测是当我使用 task_list 作为池中“args”的队列并添加任务时它应该工作。找到了一个使用“进程”而不是“池”的示例,希望在本周末晚些时候更新和清理。 Python 帮助也建议这样做:docs.python.org/2/library/…(打开链接时的第一个示例) 【参考方案1】:

您可以使用队列,您可以在此处查看示例: https://www.journaldev.com/15631/python-multiprocessing-example 通过这种方式,您将能够添加到队列中并有恒定数量的跑步者从队列中取出。

【讨论】:

请贴出相关代码,而不仅仅是链接。

以上是关于如何根据工作人员结果将额外任务添加到正在运行的多处理池中?的主要内容,如果未能解决你的问题,请参考以下文章

任务三

如何将 Let encrypt 添加到在 Elastic Beanstalk 上运行的多容器中

你如何在 sbt 中编写任务?

Git 常用命令

如何将交换添加到 EC2 实例?

如何将Java ChangeListener添加到方法返回