使用 asyncio 处理超时

Posted

技术标签:

【中文标题】使用 asyncio 处理超时【英文标题】:Handling Timeouts with asyncio 【发布时间】:2020-03-12 23:59:42 【问题描述】:

免责声明:这是我第一次尝试asyncio 模块。

我正在以下列方式使用asyncio.wait 来尝试支持超时功能,以等待来自一组异步任务的所有结果。这是一个更大的库的一部分,所以我省略了一些不相关的代码。

请注意,该库已经支持通过 ThreadPoolExecutors 和 ProcessPoolExecutors 提交任务和使用超时,所以我对使用它们的建议或关于我为什么使用 asyncio 这样做的问题并不感兴趣。上代码...

import asyncio
from contextlib import suppress

... 

class Asynciosubmit(Node):
    def get_results(self, futures, timeout=None):
        loop = asyncio.get_event_loop()
        finished, unfinished = loop.run_until_complete(
            asyncio.wait(futures, timeout=timeout)
        )
        if timeout and unfinished:
            # Code options in question would go here...see below.
            raise asyncio.TimeoutError

起初我并不担心在超时时取消挂起的任务,但后来我在程序退出时收到警告 Task was destroyed but it is pending!loop.close。经过一番研究,我发现了多种取消任务并等待它们实际被取消的方法:

选项 1:

[task.cancel() for task in unfinished]
for task in unfinished:
    with suppress(asyncio.CancelledError):
        loop.run_until_complete(task)

选项 2:

[task.cancel() for task in unfinished]
loop.run_until_complete(asyncio.wait(unfinished))

选项 3:

# Not really an option for me, since I'm not in an `async` method
# and don't want to make get_results an async method.
[task.cancel() for task in unfinished]
for task in unfinished:
    await task

选项 4:

类似于this 答案中的某种while 循环。似乎我的其他选择更好,但包括完整性。


到目前为止,选项 1 和 2 似乎都可以正常工作。任何一个选项都可能是“正确的”,但是随着asyncio 多年来的发展,网络上的示例和建议要么已经过时,要么变化很大。所以我的问题是......

问题 1

选项 1 和 2 之间有什么实际区别吗?我知道run_until_complete 将一直运行到未来完成,所以由于选项 1 以特定顺序循环,我想如果早期任务需要更长时间才能实际完成,它的行为可能会有所不同。我尝试查看 asyncio 源代码以了解 asyncio.wait 是否只是有效地对其任务/未来做同样的事情,但这并不明显。

问题 2

我假设如果其中一项任务处于长时间运行的阻塞操作的中间,它实际上可能不会立即取消?也许这仅取决于所使用的底层操作或库是否会立即引发 CancelledError?也许为 asyncio 设计的库永远不会发生这种情况?

由于我在这里尝试实现超时功能,因此对此有些敏感。如果这些事情可能需要很长时间才能取消,我会考虑致电cancel 而不是等待它实际发生,或者设置一个非常短的超时来等待取消完成。

问题 3

loop.run_until_complete(或者实际上是对async.wait 的底层调用)是否有可能出于超时以外的原因返回unfinished 中的值?如果是这样,我显然必须稍微调整一下我的逻辑,但从docs 看来,这似乎是不可能的。

【问题讨论】:

【参考方案1】:

选项 1 和 2 之间有什么实际区别吗?

没有。选项 2 看起来更好,可能效率更高,但它们的净效果是一样的。

我知道run_until_complete 将一直运行到未来完成,所以由于选项 1 以特定顺序循环,我想如果早期任务需要更长的时间才能真正完成,它的行为可能会有所不同。

一开始似乎是这样,但实际上并非如此,因为loop.run_until_complete 运行提交给循环的所有 个任务,而不仅仅是作为参数传递的那个。它只是在提供的等待完成后停止 - 这就是“运行直到完成”所指的内容。在已调度的任务上调用run_until_complete 的循环类似于以下异步代码:

ts = [asyncio.create_task(asyncio.sleep(i)) for i in range(1, 11)]
# takes 10s, not 55s
for t in ts:
    await t

在语义上等价于以下线程代码:

ts = []
for i in range(1, 11):
    t = threading.Thread(target=time.sleep, args=(i,))
    t.start()
    ts.append(t)
# takes 10s, not 55s
for t in ts:
    t.join()

换句话说,await trun_until_complete(t) 会阻塞直到 t 完成,但允许其他所有内容 - 例如之前使用 asyncio.create_task() 计划的任务也可以在此期间运行。所以总运行时间将等于最长任务的运行时间,而不是它们的总和。例如,如果第一个任务恰好需要很长时间,那么所有其他任务都将在此期间完成,并且它们的 await 根本不会休眠。

所有这些仅适用于先前已安排的等待任务。如果您尝试将其应用于协程,它将不起作用:

# runs for 55s, as expected
for i in range(1, 11):
    await asyncio.sleep(i)

# also 55s - we didn't call create_task() so it's equivalent to the above
ts = [asyncio.sleep(i) for i in range(1, 11)]
for t in ts:
    await t

# also 55s
for i in range(1, 11):
   t = threading.Thread(target=time.sleep, args=(i,))
   t.start()
   t.join()

对于 asyncio 初学者来说,这通常是一个症结所在,他们编写与上一个 asyncio 示例等效的代码并希望它能够并行运行。

我尝试查看 asyncio 源代码以了解 asyncio.wait 是否有效地对其任务/未来做同样的事情,但这并不明显。

asyncio.wait 只是一个方便的 API,它做了两件事:

将输入参数转换为实现Future 的东西。对于协程,这意味着它将它们提交给事件循环,就像使用 create_task 一样,这允许它们独立运行。如果您一开始就给它分配任务,那么这一步将被跳过。 使用add_done_callback 在期货完成时收到通知,此时它将恢复其调用者。

是的,它做同样的事情,但实现方式不同,因为它支持更多功能。

我假设如果其中一个任务处于长时间运行的阻塞操作的中间,它实际上可能不会立即取消?

在 asyncio 中不应该有“阻塞”操作,只有那些暂停的操作,它们应该立即取消。例外情况是使用run_in_executor 阻止附加到 asyncio 上的代码,其中底层操作根本不会取消,但 asyncio 协程将立即获得异常。

也许这仅取决于所使用的底层操作或库是否会立即引发 CancelledError?

库没有提升CancelledError,它接收它在等待点,它恰好在取消发生之前暂停。对于库,取消的效果是await ... 中断其等待并立即提高CancelledError。除非被捕获,否则异常将通过函数和await 调用一直传播到***协程,其引发CancelledError 将整个任务标记为已取消。表现良好的 asyncio 代码可以做到这一点,可能使用finally 来释放它们持有的操作系​​统级资源。当CancelledError 被捕获时,代码可以选择不重新引发它,在这种情况下取​​消被有效地忽略。

是否有可能 loop.run_until_complete(或者实际上是对 async.wait 的底层调用)由于超时以外的原因返回未完成的值?

如果您使用的是return_when=asyncio.ALL_COMPLETE(默认),那应该是不可能的。 return_when=FIRST_COMPLETED 是完全可能的,那么它显然可以独立于超时。

【讨论】:

谢谢,很有帮助。关于您的第一个线程示例的一个快速问题/澄清......我假设您只是将其展示为一种类似但不完全相同的方式来使用线程执行此操作。我的假设是 asyncio 不会在后台执行任何多线程,只是想确保我没有误解。 @totalhack 在所有方面都正确。 Asyncio 是凶猛的单线程,它的实现完全不同。正如您所猜测的那样,我的目标是通过与线程中的等效概念进行类比来解释这些概念。

以上是关于使用 asyncio 处理超时的主要内容,如果未能解决你的问题,请参考以下文章

Python3 asyncio:wait_for()通信()超时,如何获得部分结果?

Asyncio 和 Discord.py 超时上下文管理器应该在任务中使用

《asyncio 系列》10. 在微服务中集成 asyncio,以及超时控制自动重试服务降级

Python之asyncio模块的使用

Python asyncio:处理潜在的无限列表

asyncio的简单使用,python异步高效处理数据,asyncio.get_event_loop(),loop.run_until_complete(main()),loop.close()