如何使用 python 的 asyncio 模块正确创建和运行并发任务?
Posted
技术标签:
【中文标题】如何使用 python 的 asyncio 模块正确创建和运行并发任务?【英文标题】:How to properly create and run concurrent tasks using python's asyncio module? 【发布时间】:2015-05-29 22:23:15 【问题描述】:我正在尝试使用 Python 3 相对较新的 asyncio
模块正确理解和实现两个同时运行的 Task
对象。
简而言之,asyncio 似乎旨在通过事件循环处理异步进程和并发Task
执行。它提倡使用await
(应用于异步函数)作为等待和使用结果的无回调方式,而不会阻塞事件循环。 (期货和回调仍然是一个可行的选择。)
它还提供了asyncio.Task()
类,这是Future
的专门子类,旨在包装协程。最好使用asyncio.ensure_future()
方法调用。 asyncio 任务的预期用途是允许独立运行的任务与同一事件循环中的其他任务“同时”运行。我的理解是Tasks
连接到事件循环,然后在await
语句之间自动继续驱动协程。
我喜欢能够使用并发任务而不需要使用Executor
类之一的想法,但我没有找到关于实现的详细说明。
这就是我目前的做法:
import asyncio
print('running async test')
async def say_boo():
i = 0
while True:
await asyncio.sleep(0)
print('...boo 0'.format(i))
i += 1
async def say_baa():
i = 0
while True:
await asyncio.sleep(0)
print('...baa 0'.format(i))
i += 1
# wrap in Task object
# -> automatically attaches to event loop and executes
boo = asyncio.ensure_future(say_boo())
baa = asyncio.ensure_future(say_baa())
loop = asyncio.get_event_loop()
loop.run_forever()
在尝试同时运行两个循环任务的情况下,我注意到除非任务具有内部await
表达式,否则它将卡在while
循环中,从而有效地阻止其他任务运行(很多就像普通的while
循环一样)。但是,一旦任务必须(a)等待,它们似乎可以同时运行而没有问题。
因此,await
语句似乎为事件循环提供了一个在任务之间来回切换的立足点,从而产生并发的效果。
带有内部await
的示例输出:
running async test
...boo 0
...baa 0
...boo 1
...baa 1
...boo 2
...baa 2
示例输出没有内部await
:
...boo 0
...boo 1
...boo 2
...boo 3
...boo 4
问题
此实现是否通过asyncio
中并发循环任务的“正确”示例?
唯一可行的方法是让Task
提供一个阻塞点(await
表达式)以便事件循环处理多个任务,这是否正确?
【问题讨论】:
是的,任务从yield from
自动执行到下一个yield from
。
【参考方案1】:
是的,在事件循环中运行的任何协程都会阻止其他协程和任务运行,除非它
-
使用
yield from
或await
(如果使用Python 3.5+)调用另一个协程。
返回。
这是因为asyncio
是单线程的;事件循环运行的唯一方法是没有其他协程主动执行。使用yield from
/await
会暂时挂起协程,让事件循环有机会工作。
您的示例代码很好,但在许多情况下,您可能不希望在事件循环内运行不执行异步 I/O 的长时间运行代码。在这些情况下,使用asyncio.loop.run_in_executor
在后台线程或进程中运行代码通常更有意义。如果您的任务受 CPU 限制,ProcessPoolExecutor
将是更好的选择,如果您需要执行一些对 asyncio
不友好的 I/O,则使用 ThreadPoolExecutor
。
例如,您的两个循环完全受 CPU 限制并且不共享任何状态,因此使用 ProcessPoolExecutor
跨 CPU 并行运行每个循环将获得最佳性能:
import asyncio
from concurrent.futures import ProcessPoolExecutor
print('running async test')
def say_boo():
i = 0
while True:
print('...boo 0'.format(i))
i += 1
def say_baa():
i = 0
while True:
print('...baa 0'.format(i))
i += 1
if __name__ == "__main__":
executor = ProcessPoolExecutor(2)
loop = asyncio.get_event_loop()
boo = loop.run_in_executor(executor, say_boo)
baa = loop.run_in_executor(executor, say_baa)
loop.run_forever()
【讨论】:
谢谢。很好的时机,因为我只是想知道这个主题:使用执行者。 尝试上面的代码并发现 boo 任务正在阻止 baa 运行,除非我将 asyncio.sleep(0) 的产量添加到每个 while True 循环中? @shongololo 抱歉,已修复。应该使用asyncio.async
,而不是直接使用asyncio.Task
构造函数。我们不希望 say_boo
和 say_baa
成为协程,它们应该只是在事件循环之外运行的普通函数,因此您不应向它们添加 yield from
调用或将它们包装在 asyncio.Task
中.
看起来 asyncio.async 是 ensure_future 的别名,现已弃用
我复制了代码并使用 Python 3.9 运行它。它抛出一个错误 - RuntimeError: no running event loop;但继续执行。此外,如上所述,boo 正在阻止进一步的代码执行。我想我明白了 - 创建任务,它将在当前循环中产生,但在“外部”执行器上执行 - 瞧,在后台工作。但它不起作用。第一个任务正在立即运行并阻止其余任务,因为它在同一个循环中。我删除了 asyncio.create_task() 并且它似乎工作正常 - 两个函数都开始在 loop.run_forever() 行上运行。【参考方案2】:
您不一定需要yield from x
来控制事件循环。
在您的示例中,我认为正确的方法是使用yield None
或等效的简单yield
,而不是yield from asyncio.sleep(0.001)
:
import asyncio
@asyncio.coroutine
def say_boo():
i = 0
while True:
yield None
print("...boo 0".format(i))
i += 1
@asyncio.coroutine
def say_baa():
i = 0
while True:
yield
print("...baa 0".format(i))
i += 1
boo_task = asyncio.async(say_boo())
baa_task = asyncio.async(say_baa())
loop = asyncio.get_event_loop()
loop.run_forever()
协程只是普通的旧 Python 生成器。
在内部,asyncio
事件循环记录了这些生成器,并在一个永无止境的循环中对它们中的每一个调用gen.send()
。每当您yield
时,对gen.send()
的调用就会完成,并且循环可以继续进行。 (我正在简化它;看看https://hg.python.org/cpython/file/3.4/Lib/asyncio/tasks.py#l265 的实际代码)
也就是说,如果您需要在不共享数据的情况下进行 CPU 密集型计算,我仍然会选择 run_in_executor
。
【讨论】:
适用于 Python 3.4,但似乎不适用于 Python 3.5。 3.5有类似的方法吗? (None
似乎比在任何地方使用asyncio.sleep()
更优雅......)
从 Python 3.5 开始,正确的方法是使用asyncio.sleep(0)
。 See this discussion.以上是关于如何使用 python 的 asyncio 模块正确创建和运行并发任务?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 asyncio python 中使用 subprocess 模块限制并发进程的数量