使用 asyncio 时,如何让所有正在运行的任务在关闭事件循环之前完成

Posted

技术标签:

【中文标题】使用 asyncio 时,如何让所有正在运行的任务在关闭事件循环之前完成【英文标题】:When using asyncio, how do you allow all running tasks to finish before shutting down the event loop 【发布时间】:2015-03-03 23:31:47 【问题描述】:

我有以下代码:

@asyncio.coroutine
def do_something_periodically():
    while True:
        asyncio.async(my_expensive_operation())
        yield from asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

我运行这个函数直到完成。设置关闭时会出现问题 - 功能完成并且任何挂起的任务都不会运行。

这是错误:

task: <Task pending coro=<report() running at script.py:33> wait_for=<Future pending cb=[Task._wakeup()]>>

如何正确安排关机时间?

为了提供一些上下文,我正在编写一个系统监视器,它每 5 秒从 /proc/stat 读取一次,计算该期间的 cpu 使用率,然后将结果发送到服务器。我想继续安排这些监控作业,直到我收到 sigterm,当我停止安排时,等待所有当前作业完成,然后优雅地退出。

【问题讨论】:

为了提供一些上下文,我正在编写一个系统监视器,它每 5 秒从 /proc/stat 读取一次,计算该期间的 cpu 使用率,然后将结果发送到服务器。我想继续安排这些监控作业,直到我收到 sigterm,当我停止安排时,等待所有当前作业完成,然后优雅地退出。 你试过yield from my_expensive_operation() \n yield from asyncio.sleep(my_interval - timer() % my_interval)吗? 我可以睡足够长的时间,我知道一切都已经完成,但这似乎不是很干净。我想知道是否有办法安排任务,然后运行循环,直到所有预定任务完成。在 javascript (node.js) 中,如果主程序到达结束但设置了回调,则进程运行直到所有回调都被删除。 哦,对不起,我明白你的意思了——你的意思是不使用异步进行调度,而是让当前进程等到前一个进程完成。感觉就像你应该能够做我想做的事情(安排任务),然后等到它们全部完成。 保留async() 返回的期货(删除已完成的作业)。原则上,您可以获得所有当前的 Task 实例(可能有一个类属性)。 【参考方案1】:

您可以检索未完成的任务并再次运行循环,直到它们完成,然后关闭循环或退出您的程序。

pending = asyncio.all_tasks()
loop.run_until_complete(asyncio.gather(*pending))
pending 是待处理任务的列表。 asyncio.gather() 允许同时等待多个任务。

如果你想确保所有任务都在一个协程中完成(也许你有一个“主”协程),你可以这样做,例如:

async def do_something_periodically():
    while True:
        asyncio.create_task(my_expensive_operation())
        await asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

    await asyncio.gather(*asyncio.all_tasks())

此外,在这种情况下,由于所有任务都是在同一个协程中创建的,因此您已经可以访问这些任务:

async def do_something_periodically():
    tasks = []
    while True:
        tasks.append(asyncio.create_task(my_expensive_operation()))
        await asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

    await asyncio.gather(*tasks)

【讨论】:

非常有帮助!关于第二种方法的注意事项:我认为您附加到列表的每个任务都代表一个打开的文件描述符 - 这意味着在(比如说)Linux 上,您可能会达到打开文件的限制 (@987654327 @) 在协程完成之前。 我发现,使用第二种方法时,我收到有关打开文件描述符过多的错误消息。我认为每个任务都需要一个文件描述符才能工作。请注意,“文件描述符”与打开的文件不同,它们也可能是 select() 调用使用的那些(我相信 asyncio 库使用)。因此,如果您有几千个打开文件描述符的用户限制,以及更多的任务,您可能会遇到问题。 我可以确认 asyncio 为自己使用而打开的唯一文件描述符是选择器和自管道,因此是 3 个文件描述符。 Task 对象本身不包含任何资源对象,因此它一定是一个不相关的错误。 请注意,如果其中一个任务因异常而失败, asyncio.gather() 将不会取消剩余的任务。他们剩余的任务将处于未完成状态,并且不会运行异常处理程序来清理。传递给 gather() 的任务也可能会创建不会等待的新任务。您需要手动跟踪所有启动的任务并在异常处理程序中取消它们,或者重复调用收集直到没有剩余(或取消新启动的任务)。 您的第二个示例不会造成死锁吗?主要任务等待所有其他任务完成,但它本身就是这些任务之一,因此永远不会发生。对吗?【参考方案2】:

从 Python 3.7 开始,上述答案使用了多个已弃用的 API(asyncio.async 和 Task.all_tasks、@asyncio.coroutine、yield from 等),您应该使用它: p>

import asyncio


async def my_expensive_operation(expense):
    print(await asyncio.sleep(expense, result="Expensive operation finished."))


async def do_something_periodically(expense, interval):
    while True:
        asyncio.create_task(my_expensive_operation(expense))
        await asyncio.sleep(interval)


loop = asyncio.get_event_loop()
coro = do_something_periodically(1, 1)

try:
    loop.run_until_complete(coro)
except KeyboardInterrupt:
    coro.close()
    tasks = asyncio.all_tasks(loop)
    expensive_tasks = task for task in tasks if task._coro.__name__ != coro.__name__
    loop.run_until_complete(asyncio.gather(*expensive_tasks))

【讨论】:

shutdown_flag_is_set 标志永远不会在 do_something_periodically 中设置。 KeyboardInterrupt 已经导致 do_something_periodically 退出 没错,我添加了另一种方法【参考方案3】:

您也可以考虑使用asyncio.shield,尽管这样做您不会所有完成正在运行的任务,而只会屏蔽。但它在某些情况下仍然很有用。

除此之外,从 Python 3.7 开始,我们还可以在此处使用高级 API 方法 asynio.run。作为 Python 核心开发人员,Yury Selivanov 建议: https://youtu.be/ReXxO_azV-w?t=636注意: asyncio.run 函数已暂时添加到 Python 3.7 中的 asyncio 中。

希望有帮助!

import asyncio


async def my_expensive_operation(expense):
    print(await asyncio.sleep(expense, result="Expensive operation finished."))


async def do_something_periodically(expense, interval):
    while True:
        asyncio.create_task(my_expensive_operation(expense))
        # using asyncio.shield
        await asyncio.shield(asyncio.sleep(interval))


coro = do_something_periodically(1, 1)

if __name__ == "__main__":
    try:
        # using asyncio.run
        asyncio.run(coro)
    except KeyboardInterrupt:
        print('Cancelled!')

【讨论】:

【参考方案4】:

使用包装协程,等待待处理任务计数为 1 后再返回。

async def loop_job():
    asyncio.create_task(do_something_periodically())
    while len(asyncio.Task.all_tasks()) > 1:  # Any task besides loop_job() itself?
        await asyncio.sleep(0.2)

asyncio.run(loop_job())

【讨论】:

【参考方案5】:

我不确定这是否是您所要求的,但我遇到了类似的问题,这是我想出的最终解决方案。

代码与 python 3 兼容,并且仅使用公共 asyncio API(意味着没有 hacky _coro 和不推荐使用的 API)。

import asyncio

async def fn():
  await asyncio.sleep(1.5)
  print('fn')

async def main():
    print('main start')
    asyncio.create_task(fn()) # run in parallel
    await asyncio.sleep(0.2)
    print('main end')


def async_run_and_await_all_tasks(main):
  def get_pending_tasks():
      tasks = asyncio.Task.all_tasks()
      pending = [task for task in tasks if task != run_main_task and not task.done()]
      return pending

  async def run_main():
      await main()

      while True:
          pending_tasks = get_pending_tasks()
          if len(pending_tasks) == 0: return
          await asyncio.gather(*pending_tasks)

  loop = asyncio.new_event_loop()
  run_main_coro = run_main()
  run_main_task = loop.create_task(run_main_coro)
  loop.run_until_complete(run_main_task)

# asyncio.run(main()) # doesn't print from fn task, because main finishes earlier
async_run_and_await_all_tasks(main)

输出(如预期):

main start
main end
fn

async_run_and_await_all_tasks 函数将使 python 以 nodejs 方式运行:仅在没有未完成的任务时退出。

【讨论】:

【参考方案6】:

如果您想要一种干净的方式来等待在某个本地范围内创建的所有正在运行的任务而不会泄漏内存(并且同时防止garbage collection errors),您可以维护一组正在运行的任务并使用task.add_done_callback(...) 将任务从集。这是一个为您处理此问题的类:

class TaskSet:
    def __init__(self):
        self.tasks = set()

    def add(self, coroutine: Coroutine) -> Task:
        task = asyncio.create_task(coroutine)
        self.tasks.add(task)
        task.add_done_callback(lambda _: self.tasks.remove(task))
        return task

    def __await__(self):
        return asyncio.gather(*self.tasks).__await__()

可以这样使用:

async def my_function():
    await asyncio.sleep(0.5)


async def go():
    tasks = TaskSet()
    for i in range(10):
        tasks.add(my_function())
    await tasks

【讨论】:

【参考方案7】:

我注意到一些使用asyncio.gather(*asyncio.all_tasks()) 建议的答案,但问题有时可能是一个无限循环,它等待asyncio.current_task() 完成,这本身就是。一些答案提出了一些复杂的解决方法,包括检查coro 名称或len(asyncio.all_tasks()),但事实证明,利用set 操作非常简单:

async def main():
    # Create some tasks.
    for _ in range(10):
        asyncio.create_task(asyncio.sleep(10))
    # Wait for all other tasks to finish other than the current task i.e. main().
    await asyncio.gather(*asyncio.all_tasks() - asyncio.current_task())

【讨论】:

恕我直言,截至 2021 年最好的一个。 这是完美的。谢谢。

以上是关于使用 asyncio 时,如何让所有正在运行的任务在关闭事件循环之前完成的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 python 的 asyncio 模块正确创建和运行并发任务?

当我清楚地表明我只想完成第一个任务时,为什么所有任务都在asyncio.wait()中完成?

《asyncio 系列》4. 如何并发运行多个任务(asyncio.gatherasyncio.as_completedasyncio.wait)

Python websockets 服务器和 websockets 客户端在运行这两个任务时使用 asyncio 断言错误

所有任务完成后如何终止python asyncio event_loop

asyncio "任务已被破坏,但它正在等待处理!"在 pysnmp 示例程序中