在 concurrent.futures 中检测失败的任务
Posted
技术标签:
【中文标题】在 concurrent.futures 中检测失败的任务【英文标题】:Detect failed tasks in concurrent.futures 【发布时间】:2016-06-13 04:15:24 【问题描述】:我一直在使用 concurrent.futures,因为它有一个简单的界面,让用户可以轻松控制最大线程/进程数。但是,concurrent.futures 似乎隐藏了失败的任务并在所有任务完成/失败后继续主线程。
import concurrent.futures
def f(i):
return (i + 's')
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
fs = [executor.submit(f, i ) for i in range(10)]
concurrent.futures.wait(fs)
对任何整数调用 f 都会导致 TypeError。但是,整个脚本运行良好并以代码 0 退出。有没有办法让它在任何线程失败时抛出异常/错误?
或者,有没有更好的方法来限制线程/进程的数量而不使用 concurrent.futures?
【问题讨论】:
相关:***.com/questions/33448329/… 【参考方案1】:concurrent.futures.wait
将确保所有任务都已完成,但它不会检查成功(return
-ed)与失败(引发异常且未在工作函数中捕获)。为此,您需要在每个Future
上调用.result()
(这将导致它要么重新raise
来自任务的异常,要么产生return
-ed 值)。还有其他方法可以在不实际引发主线程的情况下进行检查(例如.exception()
),但.result()
是最直接的方法。
如果你想让它重新raise
,最简单的方法就是将wait()
调用替换为:
for fut in concurrent.futures.as_completed(fs):
fut.result()
它将在Future
s 完成时处理结果,并在发生时立即处理raise
和Exception
。或者,您可以继续使用 wait
,以便所有任务在检查任何异常之前完成,然后直接迭代 fs
并在每个任务上调用 .result()
。
【讨论】:
【参考方案2】:还有另一种方法可以使用 multiprocessing.Pool(用于进程)或 multiprocessing.pool.ThreadPool(用于线程)。据我所知,它会重新抛出任何捕获的异常。
【讨论】:
这个问题是关于concurrent.futures
AFAIK,这是多处理模块的更现代的实现。对我来说,建议使用旧库是没有意义的。
@guettli,这个旧库仍然被许多并行库积极使用,例如Joblib 和 Dask。而且 future 在功能上并不等同于具有 map
函数的 Pool - 这可能比等待多个未来对象更有效。
是的,许多库都在积极使用多处理。但是这个问题的标题是:“Detect failed tasks in concurrent.futures”
是的。但这是否意味着我们不能提出任何替代 OP 的想法?有时没有其他方法可以回答问题,只能重定向到其他内容。
AFAIK concurrent.futures 更现代。我知道多处理没有被弃用,但我不建议回去。当然,有时重定向到其他东西很有帮助。以上是关于在 concurrent.futures 中检测失败的任务的主要内容,如果未能解决你的问题,请参考以下文章
在 concurrent.futures 中获取异常的原始行号
在 python 的 concurrent.futures 中查找 BrokenProcessPool 的原因
从 concurrent.futures 到 asyncio
为啥我不能在类方法中使用 python 模块 concurrent.futures?
使用 concurrent.futures.ThreadPoolExecutor() 时的 PyQt5 小部件 Qthread 问题