在超时中包装 asyncio.gather

Posted

技术标签:

【中文标题】在超时中包装 asyncio.gather【英文标题】:Wrapping asyncio.gather in a timeout 【发布时间】:2019-01-29 18:14:17 【问题描述】:

我见过asyncio.gather vs asyncio.wait,但不确定这是否解决了这个特定问题。我要做的是将asyncio.gather() 协程包装在asyncio.wait_for() 中,并带有timeout 参数。我还需要满足这些条件:

return_exceptions=True(来自asyncio.gather()) - 我不想将异常传播到等待gather() 的任务,而是想在结果中包含异常实例 顺序:保留asyncio.gather()的属性,即结果的顺序与输入的顺序相同。 (或者,将输出映射回输入。)。 asyncio.wait_for() 不符合此标准,我不确定实现它的理想方法。

超时适用于等待列表中的整个 asyncio.gather() - 如果它们在超时中被捕获或返回异常,则任何一种情况都应该在结果列表。

考虑这个设置:

>>> import asyncio
>>> import random
>>> from time import perf_counter
>>> from typing import Iterable
>>> from pprint import pprint
>>> 
>>> async def coro(i, threshold=0.4):
...     await asyncio.sleep(i)
...     if i > threshold:
...         # For illustration's sake - some coroutines may raise,
...         # and we want to accomodate that and just test for exception
...         # instances in the results of asyncio.gather(return_exceptions=True)
...         raise Exception("i too high")
...     return i
... 
>>> async def main(n, it: Iterable):
...     res = await asyncio.gather(
...         *(coro(i) for i in it),
...         return_exceptions=True
...     )
...     return res
... 
>>> 
>>> random.seed(444)
>>> n = 10
>>> it = [random.random() for _ in range(n)]
>>> start = perf_counter()
>>> res = asyncio.run(main(n, it=it))
>>> elapsed = perf_counter() - start
>>> print(f"Done main(n) in elapsed:0.2f seconds")  # Expectation: ~1 seconds
Done main(10) in 0.86 seconds
>>> pprint(dict(zip(it, res)))
0.01323751590501987: 0.01323751590501987,
 0.07422124156714727: 0.07422124156714727,
 0.3088946587429545: 0.3088946587429545,
 0.3113884366691503: 0.3113884366691503,
 0.4419557492849159: Exception('i too high'),
 0.4844375347808497: Exception('i too high'),
 0.5796792804615848: Exception('i too high'),
 0.6338658027451068: Exception('i too high'),
 0.7426396870165088: Exception('i too high'),
 0.8614799253779063: Exception('i too high')

上面的程序,n = 10,执行运行时间为 0.5 秒,异步运行时还有一点开销。 (random.random() 将均匀分布在 [0, 1) 中。)

假设我想在整个操作(即协程main())上将其作为超时:

timeout = 0.5

现在,我可以使用asyncio.wait(),但问题是结果是set对象,所以绝对不能保证asyncio.gather()的排序返回值属性:

>>> async def main(n, it, timeout) -> tuple:
...     tasks = [asyncio.create_task(coro(i)) for i in it]
...     done, pending = await asyncio.wait(tasks, timeout=timeout)
...     return done, pending
... 
>>> timeout = 0.5
>>> random.seed(444)
>>> it = [random.random() for _ in range(n)]
>>> start = perf_counter()
>>> done, pending = asyncio.run(main(n, it=it, timeout=timeout))
>>> for i in pending:
...     i.cancel()
>>> elapsed = perf_counter() - start
>>> print(f"Done main(n) in elapsed:0.2f seconds")
Done main(10) in 0.50 seconds
>>> done
<Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>, <Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3088946587429545>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3113884366691503>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.01323751590501987>, <Task finished coro=<coro() done, defined at <stdin>:1> result=0.07422124156714727>
>>> pprint(done)
<Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3113884366691503>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.07422124156714727>,
 <Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.01323751590501987>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3088946587429545>
>>> pprint(pending)
<Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>

如上所述,问题在于我似乎无法将 task 实例映射回 iterable 中的输入。他们的任务 ID 在tasks = [asyncio.create_task(coro(i)) for i in it] 的函数范围内有效地丢失了。是否有 Pythonic 方式/使用 asyncio API 来模仿 asyncio.gather() 的行为?

【问题讨论】:

【参考方案1】:

看看底层的_wait() 协程,这个协程会传递一个任务列表,并将修改这些任务的状态。这意味着,在main() 的范围内,来自tasks = [asyncio.create_task(coro(i)) for i in it]tasks 将通过对await asyncio.wait(tasks, timeout=timeout) 的调用进行修改。一种解决方法是仅返回 tasks 本身,而不是返回 (done, pending) 元组,这与输入 it 保持顺序。 wait()/_wait() 只是将任务分成完成/待处理的子集,在这种情况下,我们可以丢弃这些子集并使用元素已更改的 tasks 的整个列表。

在这种情况下有三种可能的任务状态:

一个任务返回了一个有效的结果 (coro()) 没有引发异常,它在timeout 下完成。它的 .cancelled() 将为 False,并且它有一个有效的 .result() 不是异常实例 在有机会返回结果或引发异常之前,任务因超时而受到影响。它将显示.cancelled(),其.exception() 将引发CancelledError 允许有时间完成并从coro() 引发异常的任务;它会将.cancelled() 显示为 False 并且其exception() 将引发

(所有这些都在asyncio/futures.py中列出。)


插图:

>>> # imports/other code snippets - see question
>>> async def main(n, it, timeout) -> tuple:
...     tasks = [asyncio.create_task(coro(i)) for i in it]
...     await asyncio.wait(tasks, timeout=timeout)
...     return tasks  # *not* (done, pending)

>>> timeout = 0.5
>>> random.seed(444)
>>> n = 10
>>> it = [random.random() for _ in range(n)]
>>> start = perf_counter()
>>> tasks = asyncio.run(main(n, it=it, timeout=timeout))
>>> elapsed = perf_counter() - start
>>> print(f"Done main(n) in elapsed:0.2f seconds")
Done main(10) in 0.50 seconds

>>> pprint(tasks)
[<Task finished coro=<coro() done, defined at <stdin>:1> result=0.3088946587429545>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.01323751590501987>,
 <Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>,
 <Task finished coro=<coro() done, defined at <stdin>:1> exception=Exception('i too high')>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.3113884366691503>,
 <Task finished coro=<coro() done, defined at <stdin>:1> result=0.07422124156714727>,
 <Task cancelled coro=<coro() done, defined at <stdin>:1>>]

现在应用上面的逻辑,让res保留与输入对应的顺序:

>>> res = []
>>> for t in tasks:
...     try:
...         r = t.result()
...     except Exception as e:
...         res.append(e)
...     else:
...         res.append(r)
>>> pprint(res)
[0.3088946587429545,
 0.01323751590501987,
 Exception('i too high'),
 CancelledError(),
 CancelledError(),
 CancelledError(),
 Exception('i too high'),
 0.3113884366691503,
 0.07422124156714727,
 CancelledError()]
>>> dict(zip(it, res))
0.3088946587429545: 0.3088946587429545,
 0.01323751590501987: 0.01323751590501987,
 0.4844375347808497: Exception('i too high'),
 0.8614799253779063: concurrent.futures._base.CancelledError(),
 0.7426396870165088: concurrent.futures._base.CancelledError(),
 0.6338658027451068: concurrent.futures._base.CancelledError(),
 0.4419557492849159: Exception('i too high'),
 0.3113884366691503: 0.3113884366691503,
 0.07422124156714727: 0.07422124156714727,
 0.5796792804615848: concurrent.futures._base.CancelledError()

【讨论】:

从 python 3.8 开始 except Exception as e: 将不起作用。应在结果代码块中将其更改为except BaseException as e: ...。

以上是关于在超时中包装 asyncio.gather的主要内容,如果未能解决你的问题,请参考以下文章

是否可以在 mio 中注册多个超时?

如何为查询执行设置语句超时

你如何将 setTimeout 包装在一个承诺中[重复]

超时如何在 karma 中运行的角度测试中工作

在进行数据库调用的线程中使用 ruby​​ 超时

masstransit请求/响应:在消费者中获取调用者超时