如何使用 asyncio 同时运行无限循环?

Posted

技术标签:

【中文标题】如何使用 asyncio 同时运行无限循环?【英文标题】:How to concurrently run a infinite loop with asyncio? 【发布时间】:2017-12-11 03:20:29 【问题描述】:

等待时如何继续下一个循环?例如:

async def get_message():
    # async get message from queue
    return message

async process_message(message):
    # make some changes on message
    return message

async def deal_with_message(message):
    # async update some network resource with given message

async def main():
    while True:
        message = await get_message()
        message = await process_message(message)
        await deal_with_message(message)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

如何使while True 循环并发?如果它正在等待deal_with_message,它可以进入下一个循环并运行get_message

已编辑

我想我找到了解决办法:

async def main():
    asyncio.ensure_future(main())
    message = await get_message()
    message = await process_message(message)
    await deal_with_message(message)

loop = asyncio.get_event_loop()
asyncio.ensure_future(main())
loop.run_forever()

【问题讨论】:

虽然我发现它按我的预期工作,但我认为它不是一种pythonical方式,有没有更好的方法?这个解决方案和nodejs中的process.nextTick()是一样的。 【参考方案1】:

您的解决方案会起作用,但我发现它有问题。

async def main():
    asyncio.ensure_future(main())
    # task finishing

main 启动后,它会立即创建新任务并立即发生(ensure_future 立即创建任务),这与实际完成此任务需要时间不同。我想这可能会导致创建大量任务,从而耗尽您的 RAM。

此外,这意味着可能有大量任务可以同时运行。它会耗尽您的网络吞吐量或可以同时打开的套接字数量(想象一下您要并行下载 1 000 000 个 url - 不会发生任何好事)。

在并发世界中,这个问题通常是can be solved,通过使用Semaphore 之类的东西来限制可以同时运行的具有某些合理值的事物的数量。但是,在您的情况下,我认为手动跟踪正在运行的任务数量并手动填充它更方便:

import asyncio
from random import randint


async def get_message():
    message = randint(0, 1_000)
    print(f'message got')
    return message


async def process_message(message):
    await asyncio.sleep(randint(1, 5))
    print(f'message processed')
    return message


async def deal_with_message(message):
    await asyncio.sleep(randint(1, 5))
    print(f'message dealt')


async def utilize_message():
    message = await get_message()
    message = await process_message(message)
    await deal_with_message(message)


parallel_max = 5  # don't utilize more than 5 msgs parallely
parallel_now = 0


def populate_tasks():
    global parallel_now
    for _ in range(parallel_max - parallel_now):
        parallel_now += 1
        task = asyncio.ensure_future(utilize_message())
        task.add_done_callback(on_utilized)


def on_utilized(_):
    global parallel_now
    parallel_now -= 1
    populate_tasks()


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        populate_tasks()
        loop.run_forever()
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

输出如下:

939 got
816 got
737 got
257 got
528 got
939 processed
816 processed
528 processed
816 dealt
589 got
939 dealt
528 dealt
712 got
263 got
737 processed
257 processed
263 processed
712 processed
263 dealt
712 dealt
386 got
708 got
589 processed
257 dealt
386 processed
708 processed
711 got
711 processed

这里重要的部分是我们如何在运行的任务数量减少到少于五个后才使用下一条消息。

更新:

是的,如果您不需要动态更改最大运行数,信号量似乎更方便。

sem = asyncio.Semaphore(5)


async def main():
    async with sem:
        asyncio.ensure_future(main())
        await utilize_message()


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        asyncio.ensure_future(main())
        loop.run_forever()
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

【讨论】:

非常感谢。我已经仔细阅读了您的答案,我认为如果使用 Semaphore 会更方便。我只需要在main 开头获取,在main 结尾释放。我理解对了吗? @Sraw 是的,一旦你写了它,我就明白Semaphore 比我建议的第一个版本更好。我用使用它的例子更新了答案:你可以使用async with,而不是手动获取/释放。 * 你必须使用 async with【参考方案2】:

最简单的解决方案是asyncio.ensure_future

async def main():
    tasks = []
    while running:
        message = await get_message()
        message = await process_message(message)
        coroutine = deal_with_message(message)
        task = asyncio.ensure_future(coroutine) # starts running coroutine
        tasks.append(task)
    await asyncio.wait(tasks)

如果您的所有任务都可以在最后等待,那么您自己跟踪任务是可选的。

async def main():
    while running:
        message = await get_message()
        message = await process_message(message)
        coroutine = deal_with_message(message)
        asyncio.ensure_future(coroutine)
    tasks = asyncio.Task.all_tasks()
    await asyncio.wait(tasks)

【讨论】:

查看我的更新。而且我认为这不会起作用,因为while running 也会阻止整个进程,await asyncio.wait(tasks) 将永远不会运行。 asyncio.ensure_future(coroutine) 启动任务运行,就像您的更新一样。 asyncio.wait(tasks) 只是确保在程序退出之前所有任务都已停止。 是的,我已经意识到了。

以上是关于如何使用 asyncio 同时运行无限循环?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 python 的 asyncio 模块正确创建和运行并发任务?

使用 asyncio 时,如何让所有正在运行的任务在关闭事件循环之前完成

如何将协程添加到正在运行的 asyncio 循环中?

使用 asyncio 运行协程后如何继续原始事件循环?

Python - 如何使用 asyncio 同时运行多个协程?

如何在 asyncio 中同时运行任务?