如果一个失败,如何取消收集中的所有剩余任务?
Posted
技术标签:
【中文标题】如果一个失败,如何取消收集中的所有剩余任务?【英文标题】:How to cancel all remaining tasks in gather if one fails? 【发布时间】:2020-03-23 04:59:03 【问题描述】:如果gather
的一个任务引发异常,其他任务仍然可以继续。
嗯,这不是我所需要的。我想区分需要取消所有剩余任务的致命错误,以及不是但应该记录的错误,同时允许其他任务继续。
这是我实现这一点的失败尝试:
from asyncio import gather, get_event_loop, sleep
class ErrorThatShouldCancelOtherTasks(Exception):
pass
async def my_sleep(secs):
await sleep(secs)
if secs == 5:
raise ErrorThatShouldCancelOtherTasks('5 is forbidden!')
print(f'Slept for secssecs.')
async def main():
try:
sleepers = gather(*[my_sleep(secs) for secs in [2, 5, 7]])
await sleepers
except ErrorThatShouldCancelOtherTasks:
print('Fatal error; cancelling')
sleepers.cancel()
finally:
await sleep(5)
get_event_loop().run_until_complete(main())
(这里的finally await sleep
是为了防止解释器立即关闭,这会自行取消所有任务)
奇怪的是,在gather
上调用cancel
实际上并没有取消它!
PS C:\Users\m> .\AppData\Local\Programs\Python\Python368\python.exe .\wtf.py
Slept for 2secs.
Fatal error; cancelling
Slept for 7secs.
我对这种行为感到非常惊讶,因为它似乎与 the documentation 相矛盾,其中指出:
asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
从给定的协程对象或未来返回一个未来的聚合结果。
(...)
取消:如果外部Future 被取消,那么所有子项(尚未完成)也被取消。 (...)
我在这里缺少什么?如何取消剩余的任务?
【问题讨论】:
【参考方案1】:您的实现的问题是它在sleepers
已经引发之后调用sleepers.cancel()
。从技术上讲,gather()
返回的未来处于完成状态,因此它的取消必须是空操作。
要更正代码,您只需要自己取消孩子,而不是相信gather
的未来会这样做。当然,协程本身不可取消,因此您需要先将它们转换为任务(gather
无论如何都会这样做,因此您无需做额外的工作)。例如:
async def main():
tasks = [asyncio.ensure_future(my_sleep(secs))
for secs in [2, 5, 7]]
try:
await asyncio.gather(*tasks)
except ErrorThatShouldCancelOtherTasks:
print('Fatal error; cancelling')
for t in tasks:
t.cancel()
finally:
await sleep(5)
我对这种行为感到非常惊讶,因为它似乎与文档相矛盾[...]
gather
最初的绊脚石是它并没有真正运行 任务,它只是等待它们完成的助手。出于这个原因,gather
不会在剩余的任务因异常而失败时取消它们——它只是放弃等待并传播异常,让剩余的任务在后台继续进行。这是reported as a bug,但为了向后兼容并没有修复它,因为该行为已记录在案并且从一开始就没有改变。但是在这里我们还有另一个缺点:文档明确承诺能够取消返回的未来。您的代码正是这样做的,但它不起作用,没有明显的原因(至少我花了一段时间才弄清楚,并且需要阅读source)。事实证明,Future
的合约实际上阻止了它的工作。当您调用cancel()
时,gather
返回的未来已经完成,取消已完成的未来是没有意义的,它只是空操作。 (原因是已完成的 future 具有明确定义的结果,外部代码可以观察到。取消它会更改其结果,这是不允许的。)
换句话说,文档没有错误,因为如果您在await sleepers
完成之前执行了取消操作,那么它会起作用。然而,它具有误导性,因为它似乎允许在这个重要的用例中取消gather()
,但实际上并没有。
使用gather
时出现的此类问题是许多人热切期待(不是双关语)三重奏式托儿所in asyncio 的原因。
【讨论】:
非常感谢!还有一个需要解决的问题是,如果一个 coro 引发了一个非致命错误,那么随后另一个 coro 引发的致命错误将被忽略;我想答案是将my_sleep
的实现包装在(否则不明智的)try: ... except Exception as err: log_exception_and_ignore_it(err)
?
@gaazkam 如果您的要求是继续前进,那可能是最简单的方法。还可以选择将gather
包装在一个循环中,捕获except Exception
并修剪引发的任务(并记录它们的异常),但这最终会比您建议的代码多得多,而且没有真正的收获。
为了避免在所有task.cancel()
调用中引发取消错误,我需要做的一件事是通过等待另一个收集语句来跟进for t in tasks
循环,即await asyncio.gather(*tasks, return_exceptions=True)
【参考方案2】:
您可以创建自己的自定义收集功能
这会在发生任何异常时取消其所有子项:
import asyncio
async def gather(*tasks, **kwargs):
tasks = [ task if isinstance(task, asyncio.Task) else asyncio.create_task(task)
for task in tasks ]
try:
return await asyncio.gather(*tasks, **kwargs)
except BaseException as e:
for task in tasks:
task.cancel()
raise e
# If a() or b() raises an exception, both are immediately cancelled
a_result, b_result = await gather(a(), b())
【讨论】:
以上是关于如果一个失败,如何取消收集中的所有剩余任务?的主要内容,如果未能解决你的问题,请参考以下文章