Python asyncio 任务排序

Posted

技术标签:

【中文标题】Python asyncio 任务排序【英文标题】:Python asyncio task ordering 【发布时间】:2017-01-21 23:19:59 【问题描述】:

我有一个关于 python 的asyncio 模块中的事件循环如何管理未完成任务的问题。考虑以下代码:

import asyncio

@asyncio.coroutine
def a():
   for i in range(0, 3):
      print('a.' + str(i))
      yield


@asyncio.coroutine
def b():
   for i in range(0, 3):
      print('b.' + str(i))
      yield


@asyncio.coroutine
def c():
   for i in range(0, 3):
      print('c.' + str(i))
      yield


tasks = [
   asyncio.Task(a()),
   asyncio.Task(b()),
   asyncio.Task(c()),
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([t1, t2, t3]))

运行此程序将打印:

a.0
b.0
c.0
a.1
b.1
c.1
a.2
b.2
c.2

请注意,它总是打印出 'a' 然后 'b' 然后 'c'。我猜无论每个协程经过多少次迭代,它总是会按该顺序打印。所以你永远不会看到类似的东西

b.100
c.100
a.100

来自 node.js 背景,这告诉我这里的事件循环在内部维护一个队列,它用来决定接下来运行哪个任务。它最初将a() 放在队列的前面,然后是b(),然后是c(),因为这是传递给asyncio.wait() 的列表中任务的顺序。然后,每当它遇到一个 yield 语句时,它就会将该任务放在队列的末尾。我想在一个更现实的例子中,假设你正在执行一个异步 http 请求,它会在 http 响应返回后将 a() 放回队列的末尾。

我可以得到一个阿门吗?

【问题讨论】:

【参考方案1】:

目前您的示例不包含任何阻塞 I/O 代码。试试这个来模拟一些任务:

import asyncio


@asyncio.coroutine
def coro(tag, delay):
    for i in range(1, 8):
        print(tag, i)
        yield from asyncio.sleep(delay)


loop = asyncio.get_event_loop()

print("---- await 0 seconds :-) --- ")
tasks = [
    asyncio.Task(coro("A", 0)),
    asyncio.Task(coro("B", 0)),
    asyncio.Task(coro("C", 0)),
]

loop.run_until_complete(asyncio.wait(tasks))

print("---- simulate some blocking I/O --- ")
tasks = [
    asyncio.Task(coro("A", 0.1)),
    asyncio.Task(coro("B", 0.3)),
    asyncio.Task(coro("C", 0.5)),
]

loop.run_until_complete(asyncio.wait(tasks))

loop.close()

如您所见,协程是按需要安排的,而不是按顺序安排的。

【讨论】:

对,它会尽快运行回调,因此如果 I/O 需要不同的时间来完成,它们将开始出现故障。我的观点是,如果没有 I/O 进行(如我的示例中),由于对幕后发生的任务(可能是队列)进行某种管理,它们总是以可预测的顺序运行。与线程之类的东西相反,它们会根据操作系统线程调度程序运行它们并且是不可预测的。 如果没有 I/O,不要使用 asyncio。它们是不可预测的,您无法确定它在不同的实现或版本或不同的条件下如何工作。 感谢您的回复,但我的问题不在于正确使用 asyncio,而在于它是如何实现的。当然,对库实现细节做出假设是不安全的,而且我并没有试图通过提出这个问题来暗示其他情况。只是想看看是否有人知道它是如何工作的。 如果你对实现细节感兴趣,你可以在这里阅读 Python 3.6 的基本实现,BaseEventLoop._run_once【参考方案2】:

免责声明 对于至少 v3.9 的默认实现,这似乎是正确的。然而,事件循环的内部工作不是公共接口,因此可能会随着新版本的变化而改变。此外,asyncio 允许替换 BaseEventLoop 实现,这可能会改变其行为。

当创建Task 对象时,它会调用loop.call_soon 将其_step 方法注册为回调。 _step 方法实际上是通过调用send() 来调用你的协程并处理结果。

BaseEventLoop 中,loop.call_soon_step 回调放在_ready 回调列表的末尾。每次运行事件循环,都会以 FIFO 顺序迭代 _ready 回调列表并调用它们。因此,对于任务的初始运行,它们按照创建的顺序执行。

当任务awaitsyields 是一个future 时,当任务的_wakeup 方法被放入队列时,它真的取决于这个future 的性质。

另外,请注意,可以在创建任务之间注册其他回调。虽然确实如果 TaskATaskB 之前创建,TaskA 的初始运行将在 TaskB 之前发生,但仍有可能在其间运行其他回调。

最后,上述行为也适用于asyncio 附带的默认Task 类。但是,可以指定自定义任务工厂并使用替代任务实现,这也可以改变这种行为。

【讨论】:

以上是关于Python asyncio 任务排序的主要内容,如果未能解决你的问题,请参考以下文章

Python 3.6 asyncio - 从未检索到任务异常 - 任务的产量不好:200

Python 的 asyncio.gather() 似乎没有异步运行任务

如何使用 python 的 asyncio 模块正确创建和运行并发任务?

python 并发专题(十三):asyncio 协程中的多任务

python 并发专题(十三):asyncio 协程中的多任务

python-asyncio