pathos pools:在 N 个任务后更新工作进程

Posted

技术标签:

【中文标题】pathos pools:在 N 个任务后更新工作进程【英文标题】:pathos pools: Renew worker processes after N tasks 【发布时间】:2019-12-27 23:35:44 【问题描述】:

我正在构建一个并行 python 应用程序,它本质上调用了一个围绕外部库的 C 包装器。需要并行性才能在所有 CPU 内核上同时运行计算。

我最终使用了pathos.multiprocessing.ProcessPool,但这些池缺少标准multiprocessing.Pool 类构造函数(see reference here) 的maxtaskperchild 参数。我需要这个功能,因为 C 库依赖于进程时钟来定义一些执行时间限制,这些时间限制最终会在任务堆积起来时达到。

有没有办法让ProcessPool经理在完成给定数量的任务后更新工作进程?

阐明我的意图的示例代码:

from pathos.pools import ProcessPool
from os import getpid
import collections

def print_pid(task_id):
    pid = getpid()
    return pid

if __name__ == "__main__":
    NUM_TASKS = 50
    MAX_PER_CHILD = 2


    # limit each process to maximum MAX_PER_CHILD tasks
    # we would like the pool to exit the process and spawn a new one
    # when a task counter reaches the limit
    # below argument 'maxtasksperchild' would work with standard 'multiprocessing'
    pool = ProcessPool(ncpu=2, maxtasksperchild=MAX_PER_CHILD)
    results = pool.map(print_pid, range(NUM_TASKS), chunksize=1)

    tasks_per_pid = dict(collections.Counter(results))
    print(tasks_per_pid)

# printed result
# 918: 8, 919: 6, 920: 6, 921: 6, 922: 6, 923: 6, 924: 6, 925: 6
# observe that all processes did more than MAX_PER_CHILD tasks

我尝试了什么

ProcessPool 构造函数中设置maxtasksperchild(参见上面的简单示例)似乎没有任何作用 在工作函数中调用sys.exit() 会使程序挂起 我在深入研究源代码时发现了一些提示

【问题讨论】:

【参考方案1】:

pathos.multiprocessing 中有两个池: ProcessPool_ProcessPool。前者旨在拥有一个增强的池生命周期,最大限度地减少启动时间,并具有持久性和重新启动功能——但是,缺少一些“multiprocessing”关键字。后者 (_ProcessPool) 是 API 设计的一个级别,并提供与multiprocessing Pool 接口相同的接口(但使用dill)。所以,看看_ProcessPool

【讨论】:

感谢您维护这个库! _ProcessPool 是否有机会在未来消失,因为它没有记录在案? 以后再也没有机会消失了。它实际上在另外两个地方显示为完全相同的对象......如果你查看pathos.pools,它也是_ProcessPool,并且两个模块都导入pathos.helpers.ProcessPool as _ProcessPoolpathos ProcessPool 建立在它之上,它不会消失。

以上是关于pathos pools:在 N 个任务后更新工作进程的主要内容,如果未能解决你的问题,请参考以下文章

使用 pathos ProcessingPool 的地图时如何设置块大小?

python multiprocessing.Pool 太多文件打开日志文件

pathos.ProcessingPool 和 pickle 之间的交互

boost::asio::thread_pool - 如何在工作完成前取消工人?

Pathos处理池递归限制

超时后如何中止 multiprocessing.Pool 中的任务?