在 concurrent.futures 中检测失败的任务

Posted

技术标签:

【中文标题】在 concurrent.futures 中检测失败的任务【英文标题】:Detect failed tasks in concurrent.futures 【发布时间】:2016-06-13 04:15:24 【问题描述】:

我一直在使用 concurrent.futures,因为它有一个简单的界面,让用户可以轻松控制最大线程/进程数。但是,concurrent.futures 似乎隐藏了失败的任务并在所有任务完成/失败后继续主线程。

import concurrent.futures

def f(i):
    return (i + 's')

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    fs = [executor.submit(f, i ) for i in range(10)]
    concurrent.futures.wait(fs)

对任何整数调用 f 都会导致 TypeError。但是,整个脚本运行良好并以代码 0 退出。有没有办法让它在任何线程失败时抛出异常/错误?

或者,有没有更好的方法来限制线程/进程的数量而不使用 concurrent.futures?

【问题讨论】:

相关:***.com/questions/33448329/… 【参考方案1】:

concurrent.futures.wait 将确保所有任务都已完成,但它不会检查成功(return-ed)与失败(引发异常且未在工作函数中捕获)。为此,您需要在每个Future 上调用.result()(这将导致它要么重新raise 来自任务的异常,要么产生return-ed 值)。还有其他方法可以在不实际引发主线程的情况下进行检查(例如.exception()),但.result() 是最直接的方法。

如果你想让它重新raise,最简单的方法就是将wait()调用替换为:

for fut in concurrent.futures.as_completed(fs):
    fut.result()

它将在Futures 完成时处理结果,并在发生时立即处理raiseException。或者,您可以继续使用 wait,以便所有任务在检查任何异常之前完成,然后直接迭代 fs 并在每个任务上调用 .result()

【讨论】:

【参考方案2】:

还有另一种方法可以使用 multiprocessing.Pool(用于进程)或 multiprocessing.pool.ThreadPool(用于线程)。据我所知,它会重新抛出任何捕获的异常。

【讨论】:

这个问题是关于concurrent.futuresAFAIK,这是多处理模块的更现代的实现。对我来说,建议使用旧库是没有意义的。 @guettli,这个旧库仍然被许多并行库积极使用,例如Joblib 和 Dask。而且 future 在功能上并不等同于具有 map 函数的 Pool - 这可能比等待多个未来对象更有效。 是的,许多库都在积极使用多处理。但是这个问题的标题是:“Detect failed tasks in concurrent.futures” 是的。但这是否意味着我们不能提出任何替代 OP 的想法?有时没有其他方法可以回答问题,只能重定向到其他内容。 AFAIK concurrent.futures 更现代。我知道多处理没有被弃用,但我不建议回去。当然,有时重定向到其他东西很有帮助。

以上是关于在 concurrent.futures 中检测失败的任务的主要内容,如果未能解决你的问题,请参考以下文章

在 concurrent.futures 中获取异常的原始行号

在 python 的 concurrent.futures 中查找 BrokenProcessPool 的原因

从 concurrent.futures 到 asyncio

为啥我不能在类方法中使用 python 模块 concurrent.futures?

使用 concurrent.futures.ThreadPoolExecutor() 时的 PyQt5 小部件 Qthread 问题

concurrent.futures模块