如果一个失败,如何取消收集中的所有剩余任务?

Posted

技术标签:

【中文标题】如果一个失败,如何取消收集中的所有剩余任务?【英文标题】:How to cancel all remaining tasks in gather if one fails? 【发布时间】:2020-03-23 04:59:03 【问题描述】:

如果gather 的一个任务引发异常,其他任务仍然可以继续。

嗯,这不是我所需要的。我想区分需要取消所有剩余任务的致命错误,以及不是但应该记录的错误,同时允许其他任务继续。

这是我实现这一点的失败尝试:

from asyncio import gather, get_event_loop, sleep

class ErrorThatShouldCancelOtherTasks(Exception):
    pass

async def my_sleep(secs):
    await sleep(secs)
    if secs == 5:
        raise ErrorThatShouldCancelOtherTasks('5 is forbidden!')
    print(f'Slept for secssecs.')

async def main():
    try:
        sleepers = gather(*[my_sleep(secs) for secs in [2, 5, 7]])
        await sleepers
    except ErrorThatShouldCancelOtherTasks:
        print('Fatal error; cancelling')
        sleepers.cancel()
    finally:
        await sleep(5)

get_event_loop().run_until_complete(main())

(这里的finally await sleep是为了防止解释器立即关闭,这会自行取消所有任务)

奇怪的是,在gather 上调用cancel 实际上并没有取消它!

PS C:\Users\m> .\AppData\Local\Programs\Python\Python368\python.exe .\wtf.py
Slept for 2secs.
Fatal error; cancelling
Slept for 7secs.

我对这种行为感到非常惊讶,因为它似乎与 the documentation 相矛盾,其中指出:

asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

从给定的协程对象或未来返回一个未来的聚合结果。

(...)

取消:如果外部Future 被取消,那么所有子项(尚未完成)也被取消。 (...)

我在这里缺少什么?如何取消剩余的任务?

【问题讨论】:

【参考方案1】:

您的实现的问题是它在sleepers 已经引发之后调用sleepers.cancel()。从技术上讲,gather() 返回的未来处于完成状态,因此它的取消必须是空操作。

要更正代码,您只需要自己取消孩子,而不是相信gather 的未来会这样做。当然,协程本身不可取消,因此您需要先将它们转换为任务(gather 无论如何都会这样做,因此您无需做额外的工作)。例如:

async def main():
    tasks = [asyncio.ensure_future(my_sleep(secs))
             for secs in [2, 5, 7]]
    try:
        await asyncio.gather(*tasks)
    except ErrorThatShouldCancelOtherTasks:
        print('Fatal error; cancelling')
        for t in tasks:
            t.cancel()
    finally:
        await sleep(5)

我对这种行为感到非常惊讶,因为它似乎与文档相矛盾[...]

gather 最初的绊脚石是它并没有真正运行 任务,它只是等待它们完成的助手。出于这个原因,gather 不会在剩余的任务因异常而失败时取消它们——它只是放弃等待并传播异常,让剩余的任务在后台继续进行。这是reported as a bug,但为了向后兼容并没有修复它,因为该行为已记录在案并且从一开始就没有改变。但是在这里我们还有另一个缺点:文档明确承诺能够取消返回的未来。您的代码正是这样做的,但它不起作用,没有明显的原因(至少我花了一段时间才弄清楚,并且需要阅读source)。事实证明,Future 的合约实际上阻止了它的工作。当您调用cancel() 时,gather 返回的未来已经完成,取消已完成的未来是没有意义的,它只是空操作。 (原因是已完成的 future 具有明确定义的结果,外部代码可以观察到。取消它会更改其结果,这是不允许的。)

换句话说,文档没有错误,因为如果您在await sleepers 完成之前执行了取消操作,那么它会起作用。然而,它具有误导性,因为它似乎允许在这个重要的用例中取消gather(),但实际上并没有。

使用gather 时出现的此类问题是许多人热切期待(不是双关语)三重奏式托儿所in asyncio 的原因。

【讨论】:

非常感谢!还有一个需要解决的问题是,如果一个 coro 引发了一个非致命错误,那么随后另一个 coro 引发的致命错误将被忽略;我想答案是将my_sleep 的实现包装在(否则不明智的)try: ... except Exception as err: log_exception_and_ignore_it(err)? @gaazkam 如果您的要求是继续前进,那可能是最简单的方法。还可以选择将gather 包装在一个循环中,捕获except Exception 并修剪引发的任务(并记录它们的异常),但这最终会比您建议的代码多得多,而且没有真正的收获。 为了避免在所有task.cancel() 调用中引发取消错误,我需要做的一件事是通过等待另一个收集语句来跟进for t in tasks 循环,即await asyncio.gather(*tasks, return_exceptions=True)【参考方案2】:

您可以创建自己的自定义收集功能

这会在发生任何异常时取消其所有子项:

import asyncio

async def gather(*tasks, **kwargs):
    tasks = [ task if isinstance(task, asyncio.Task) else asyncio.create_task(task)
              for task in tasks ]
    try:
        return await asyncio.gather(*tasks, **kwargs)
    except BaseException as e:
        for task in tasks:
            task.cancel()
        raise e


# If a() or b() raises an exception, both are immediately cancelled
a_result, b_result = await gather(a(), b())

【讨论】:

以上是关于如果一个失败,如何取消收集中的所有剩余任务?的主要内容,如果未能解决你的问题,请参考以下文章

StateFlow:收集后取消旧的发射状态

POWERSHELL 计划任务的创建,收集DC中失败的登录信息并邮件通知

如何让backbone.stickit 一次收集所有值?

Android白名单&后台唤醒收集

Azkaban 简介

我如何收集gitlab管道中所有作业的日志?