Python 多处理:在第一个子错误时中止映射

Posted

技术标签:

【中文标题】Python 多处理:在第一个子错误时中止映射【英文标题】:Python multiprocessing: abort map on first child error 【发布时间】:2019-02-15 17:54:25 【问题描述】:

当其中一个孩子中止和/或抛出异常时,中止多处理的正确方法是什么?

我发现了各种各样的问题(generic multiprocessing error handling、how to close multiprocessing pool on exception but without answer、...),但对于如何停止子异常的多处理没有明确的答案。

例如,我期望以下代码:

def f(x):
    sleep(x)
    print(f"f(x)")
    return 1.0 / (x - 2)


def main():
    with Pool(4) as p:
        try:
            r = p.map(f, range(7))
        except Exception as e:
            print(f"oops: e")
            p.close()
            p.terminate()
    print("end")


if __name__ == '__main__':
    main()

输出:

f(0)
f(1)
f(2)
oops: float division by zero
end

相反,它在检测/处理异常之前对所有项目应用f 函数:

f(0)
f(1)
f(2)
f(4)
f(3)
f(5)
f(6)
oops: float division by zero
end

有没有办法直接捕获异常?

【问题讨论】:

请注意,异常只会在主进程 map 完成后重新引发。如果您确实想使用map,则无法更早地捕获它,因为该异常不会更早存在。那么,问题是您是要坚持使用map 还是正在寻找自定义解决方案? 哪个python版本?我的 Python:3.4.2 的行为符合您的预期。 @stovfl 我已经编辑了我的帖子,我正在使用 Python 3.6.5 @MisterMiyagi 我正在寻找最简单的带有错误处理的多处理 - 我想避免添加包装类或等效的处理它。 【参考方案1】:

我认为您将需要apply_async,因此您可以对每一个结果而不是累积结果采取行动。 pool.apply_async 提供了一个 error_callback 参数,您可以使用它来注册您的错误处理程序。 apply_async 没有阻塞,所以你需要join() 池。我还使用了一个标志terminated 来知道什么时候可以正常处理结果,以防万一没有发生异常。

from time import sleep
from multiprocessing import Pool

def f(x):
    sleep(x)
    print(f"f(x)")
    return 1.0 / (x - 2)

def on_error(e):
    global terminated
    terminated = True
    pool.terminate()
    print(f"oops:e")


def main():
    global pool
    global terminated

    terminated = False

    pool = Pool(4)
    results = [pool.apply_async(f, (x,), error_callback=on_error)
               for x in range(7)]
    pool.close()
    pool.join()

    if not terminated:
        for r in results:
            print(r.get())

    print("end")


if __name__ == '__main__':
    main()

【讨论】:

以上是关于Python 多处理:在第一个子错误时中止映射的主要内容,如果未能解决你的问题,请参考以下文章

在解析错误时中止并显示有用的消息

错误处理只工作一次

在向外部依赖项添加扩展时中止陷阱6

在片段分离时中止加载 AsyncTaskLoader

GCDWebServer 总是在选项检查时中止

如何在线程休眠时中止线程