Python asyncio:处理潜在的无限列表

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python asyncio:处理潜在的无限列表相关的知识,希望对你有一定的参考价值。

我有以下场景:

  • Python 3.6+
  • 输入数据是逐行读取的。
  • 协程将数据发送到API(使用aiohttp)并将调用结果保存到Mongo(使用motor)。所以有很多IO正在进行中。

该代码使用async / await编写,适用于手动执行的单个调用。

我不知道该怎么做就是整体消耗输入数据。

我见过的所有asyncio示例都通过发送有限列表作为参数来演示asyncio.wait。但我不能简单地向其发送任务列表,因为输入文件可能有数百万行。

我的方案是通过传送带将数据流式传输给消费者。

我还可以做些什么?我希望程序使用它可以集合的所有资源来处理文件中的数据,但不会让人不知所措。

答案

我的方案是通过传送带将数据流式传输给消费者。我还可以做些什么?

您可以创建大致相当于传送带容量的固定数量的任务,然后将它们从queue中弹出。例如:

async def consumer(queue):
    while True:
        line = await queue.get()
        # connect to API, Mongo, etc.
        ...
        queue.task_done()

async def producer():
    N_TASKS = 10
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue(N_TASKS)
    tasks = [loop.create_task(consume(queue)) for _ in range(N_TASKS)]
    try:
        with open('input') as f:
            for line in f:
                await queue.put(line)
        await queue.join()
    finally:
        for t in tasks:
            t.cancel()

因为,与线程不同,任务是轻量级的,并且不会占用操作系统资源,所以在创建“太多”它们时犯错是可以的。 asyncio可以毫不费力地处理成千上万的任务,虽然这对于这项任务来说可能有点过头了 - 数十个就足够了。

以上是关于Python asyncio:处理潜在的无限列表的主要内容,如果未能解决你的问题,请参考以下文章

Python3 Asyncio 在并发任务之间共享资源

在 Python 中使用 asyncio 并行化 Web 任务

如何使用 asyncio 同时运行无限循环?

Python 多处理线程 Asyncio

并行请求在使用 asyncio 恰好 100 个请求后无限阻塞

Python之asyncio模块的使用