Python asyncio 任务排序
Posted
技术标签:
【中文标题】Python asyncio 任务排序【英文标题】:Python asyncio task ordering 【发布时间】:2017-01-21 23:19:59 【问题描述】:我有一个关于 python 的asyncio
模块中的事件循环如何管理未完成任务的问题。考虑以下代码:
import asyncio
@asyncio.coroutine
def a():
for i in range(0, 3):
print('a.' + str(i))
yield
@asyncio.coroutine
def b():
for i in range(0, 3):
print('b.' + str(i))
yield
@asyncio.coroutine
def c():
for i in range(0, 3):
print('c.' + str(i))
yield
tasks = [
asyncio.Task(a()),
asyncio.Task(b()),
asyncio.Task(c()),
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([t1, t2, t3]))
运行此程序将打印:
a.0
b.0
c.0
a.1
b.1
c.1
a.2
b.2
c.2
请注意,它总是打印出 'a' 然后 'b' 然后 'c'。我猜无论每个协程经过多少次迭代,它总是会按该顺序打印。所以你永远不会看到类似的东西
b.100
c.100
a.100
来自 node.js 背景,这告诉我这里的事件循环在内部维护一个队列,它用来决定接下来运行哪个任务。它最初将a()
放在队列的前面,然后是b()
,然后是c()
,因为这是传递给asyncio.wait()
的列表中任务的顺序。然后,每当它遇到一个 yield 语句时,它就会将该任务放在队列的末尾。我想在一个更现实的例子中,假设你正在执行一个异步 http 请求,它会在 http 响应返回后将 a()
放回队列的末尾。
我可以得到一个阿门吗?
【问题讨论】:
【参考方案1】:目前您的示例不包含任何阻塞 I/O 代码。试试这个来模拟一些任务:
import asyncio
@asyncio.coroutine
def coro(tag, delay):
for i in range(1, 8):
print(tag, i)
yield from asyncio.sleep(delay)
loop = asyncio.get_event_loop()
print("---- await 0 seconds :-) --- ")
tasks = [
asyncio.Task(coro("A", 0)),
asyncio.Task(coro("B", 0)),
asyncio.Task(coro("C", 0)),
]
loop.run_until_complete(asyncio.wait(tasks))
print("---- simulate some blocking I/O --- ")
tasks = [
asyncio.Task(coro("A", 0.1)),
asyncio.Task(coro("B", 0.3)),
asyncio.Task(coro("C", 0.5)),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
如您所见,协程是按需要安排的,而不是按顺序安排的。
【讨论】:
对,它会尽快运行回调,因此如果 I/O 需要不同的时间来完成,它们将开始出现故障。我的观点是,如果没有 I/O 进行(如我的示例中),由于对幕后发生的任务(可能是队列)进行某种管理,它们总是以可预测的顺序运行。与线程之类的东西相反,它们会根据操作系统线程调度程序运行它们并且是不可预测的。 如果没有 I/O,不要使用 asyncio。它们是不可预测的,您无法确定它在不同的实现或版本或不同的条件下如何工作。 感谢您的回复,但我的问题不在于正确使用asyncio
,而在于它是如何实现的。当然,对库实现细节做出假设是不安全的,而且我并没有试图通过提出这个问题来暗示其他情况。只是想看看是否有人知道它是如何工作的。
如果你对实现细节感兴趣,你可以在这里阅读 Python 3.6 的基本实现,BaseEventLoop._run_once
。【参考方案2】:
免责声明 对于至少 v3.9 的默认实现,这似乎是正确的。然而,事件循环的内部工作不是公共接口,因此可能会随着新版本的变化而改变。此外,asyncio 允许替换 BaseEventLoop
实现,这可能会改变其行为。
当创建Task
对象时,它会调用loop.call_soon
将其_step
方法注册为回调。 _step
方法实际上是通过调用send()
来调用你的协程并处理结果。
在BaseEventLoop
中,loop.call_soon
将_step
回调放在_ready
回调列表的末尾。每次运行事件循环,都会以 FIFO 顺序迭代 _ready
回调列表并调用它们。因此,对于任务的初始运行,它们按照创建的顺序执行。
当任务awaits
或yield
s 是一个future 时,当任务的_wakeup
方法被放入队列时,它真的取决于这个future 的性质。
另外,请注意,可以在创建任务之间注册其他回调。虽然确实如果 TaskA
在 TaskB
之前创建,TaskA
的初始运行将在 TaskB
之前发生,但仍有可能在其间运行其他回调。
最后,上述行为也适用于asyncio
附带的默认Task
类。但是,可以指定自定义任务工厂并使用替代任务实现,这也可以改变这种行为。
【讨论】:
以上是关于Python asyncio 任务排序的主要内容,如果未能解决你的问题,请参考以下文章
Python 3.6 asyncio - 从未检索到任务异常 - 任务的产量不好:200
Python 的 asyncio.gather() 似乎没有异步运行任务
如何使用 python 的 asyncio 模块正确创建和运行并发任务?
python 并发专题(十三):asyncio 协程中的多任务