如何运行嵌套的、分层的 pathos 多处理地图?
Posted
技术标签:
【中文标题】如何运行嵌套的、分层的 pathos 多处理地图?【英文标题】:How to run nested, hierarchical pathos multiprocessing maps? 【发布时间】:2017-02-24 22:35:07 【问题描述】:在我的 dill 序列化/酸洗代码中构建了重要部分后,我还尝试使用 pathos 多处理来并行化我的计算。 Pathos 它是莳萝的自然延伸。
尝试嵌套运行时
from pathos.multiprocessing import ProcessingPool
ProcessingPool().map(fn, args)
在另一个ProcessingPool().map
中,然后我收到:
AssertionError: daemonic processes are not allowed to have children
例如:
from pathos.multiprocessing import ProcessingPool
def triple(x):
return 3*x
def refork(x):
from pathos.multiprocessing import ProcessingPool
return ProcessingPool().map(triple, xrange(5))
ProcessingPool().map(refork, xrange(3))
产量
AssertionError: daemonic processes are not allowed to have children
我尝试使用amap(...).get()
没有成功。这是在 pathos 0.2.0 上。
允许嵌套并行化的最佳方式是什么?
更新
在这一点上我必须诚实,并承认我已经从 pathos 中删除了断言 "daemonic processes are not allowed to have children"
。我还构建了一些将KeyboardInterrupt
级联到工人和工人的东西......下面的部分解决方案:
def run_parallel(exec_func, exec_args, num_workers_i)
pool = ProcessingPool(num_workers_i)
pool.restart(force=True)
pid_is = pool.map(get_pid_i, xrange(num_workers_i))
try:
results = pool.amap(
exec_func,
exec_args,
)
counter_i = 0
while not results.ready():
sleep(2)
if counter_i % 60 == 0:
print('Waiting for children running in pool.amap() with PIDs: '.format(pid_is))
counter_i += 1
results = results.get()
pool.close()
pool.join()
except KeyboardInterrupt:
print('Ctrl+C received, attempting to terminate pool...')
hard_kill_pool(pid_is, pool) # sending Ctrl+C
raise
except:
print('Attempting to close parallel after exception: '.format(sys.exc_info()[0]))
cls.hard_kill_pool(pid_is, pool) # sending Ctrl+C
raise
def hard_kill_pool(pid_is, pool):
for pid_i in pid_is:
os.kill(pid_i, signal.SIGINT) # sending Ctrl+C
pool.terminate()
似乎可以在控制台和 IPython 笔记本上工作(带有停止按钮),但不确定在所有极端情况下它是否 100% 正确。
【问题讨论】:
我是pathos
作者。你不能让进程产生进程的原因是它们没有适当地死亡,并且你有最终会挂起的僵尸进程。我会推荐@Yoda 的解决方案,因为这是典型情况……一个“昂贵”的并行块和几个“轻量级”并行工作。 pathos
也有 ParallelPool
,它速度较慢,但如果您需要线程以外的东西,则可以使用。我还建议尝试使用非阻塞地图,因为阻塞会减慢您的速度。另见:***.com/questions/28203774
@MikeMcKerns,我开始以多种方式(包括非守护进程)对代码进行试验,并最终完成了上述工作。还包括amap
,但由于其他原因,Ctrl+C
并没有让我离开map
。不幸的是,不能使用“轻量级”技巧,因为在寻找悲情时这已经是一个更大的系统(在莳萝之后)。现在下一个挑战是拥有某种共享内存(读写所有进程),这似乎很难使用我的级联解决方案......顺便说一句,很棒的工具,谢谢!
我无法想象在不能使用其他池之一(ThreadingPool
或 ParallelPool
)来提供嵌套并行性的情况下,您会有什么样的工作流程,并且需要ProcessingPools
...是的,通过删除断言,嵌套的ProcessingPools
应该可以工作。然而,断言存在的原因是嵌套的生成池往往像僵尸一样继续存在。使用它们的作业 ID 杀死僵尸进程是一种解决方法。
只是理解您最初的建议,抱歉。 ParallelPool
实际上看起来很完美!现在,代码可以在需要的任何地方创建新进程(在检查是否有足够的资源之后)。我可以将调度程序构建为基于套接字的服务器,它将接受腌制作业以执行。并非不可能,只是需要一些重构。谢谢!
好的,太好了。如果您觉得自己找到了比目前所提供的更好的答案,您应该回答自己的问题。
【参考方案1】:
我遇到了完全相同的问题。就我而言,内部操作是需要并行性的操作,所以我做了ThreadingPool
和ProcessingPool
。这是您的示例:
from pathos.multiprocessing import ProcessingPool, ThreadingPool
def triple(x):
return 3*x
def refork(x):
from pathos.multiprocessing import ProcessingPool
return ProcessingPool().map(triple, xrange(5))
ThreadingPool().map(refork, xrange(3))
您甚至可以拥有另一个带有另一个外部线程池的层。根据您的情况,您可以颠倒这些池的顺序。但是,您不能拥有进程的进程。如果确实需要,请参阅:https://***.com/a/8963618/6522112。我自己还没有尝试过,所以我无法详细说明。
【讨论】:
有道理,不幸的是,在我的情况下,我不能说哪个级别会提前大量计算,我也不能轻易限制到 2 个级别的并行化。 我提供的解决方案似乎有效,但有时外部池似乎永远挂起。我试过imap
和amap
没有运气。也许@MikeMcKerns 可以启发这个? ParallelPool
有帮助吗?
本质上,python 的 multiprocessing.Pool
在它的父级被杀死时不会干净地杀死。 multiprocess
和 pathos
也有同样的问题,因为它们重用了相同的代码。 pathos.pools.ParallelPool
不是从 multiprocessing
派生的(而是从 pp
派生的),因此它不会遇到同样的问题……但是,序列化较弱(它是通过“源提取”而不是“酸洗”来完成的,并且它不允许共享对象)。
仅供参考:源提取是dill.source
,而正常酸洗是dill
。
pool.terminate()
和 pool.join()
怎么样?我的理解是,它会适当地杀死孩子。我看到了timeout
选项,但没有选项可以检测是否还有工作。如果有一个,一个可以做以上是关于如何运行嵌套的、分层的 pathos 多处理地图?的主要内容,如果未能解决你的问题,请参考以下文章