asyncio:只能在前一个任务达到预定义阶段时启动任务吗?

Posted

技术标签:

【中文标题】asyncio:只能在前一个任务达到预定义阶段时启动任务吗?【英文标题】:asyncio: can a task only start when previous task reach a pre-defined stage? 【发布时间】:2021-11-25 17:57:50 【问题描述】:

我从 asyncio 开始,希望应用于以下问题:

数据被分割成块。 第一次压缩块。 然后将压缩后的chunk写入文件中。 单个文件用于所有块,所以我需要一个一个地处理它们。
with open('my_file', 'w+b') as f:
    for chunk in chunks:
        compress_chunk(ch)
        f.write(ch)

从这个上下文来看,为了更快地运行这个过程,一旦当前迭代的write步骤开始,是否也可以触发下一次迭代的compress步骤?

我可以用asyncio 做到这一点,保持类似的for 循环结构吗?如果是的话,你能分享一些关于这方面的建议吗?

我猜另一种并行运行的方法是使用ProcessPoolExecutor 并将compress 阶段与write 阶段完全分开。这意味着在不同的执行程序中压缩第一个所有块。

只有当所有块都被压缩后,才开始写入步骤。 但如果有意义的话,我想用asyncio 1st 研究第一种方法。

提前感谢您的帮助。 最好的

【问题讨论】:

也许是答案的开始:***.com/questions/69331788/synchronize-asyncio-queue 【参考方案1】:

您可以使用生产者-消费者模型来做到这一点。只要有一个生产者和一个消费者,你就会有正确的顺序。对于您的用例,这就是您将从中受益的全部。此外,您应该使用aioFiles 库。标准文件 IO 将主要阻塞您的主压缩/生产者线程,您不会看到太多加速。试试这样的:

async def produce(queue, chunks):
    for chunk in chunks:
        compress_chunk(ch)
        await queue.put(i)


async def consume(queue):
    with async with aiofiles.open('my_file', 'w') as f:
        while True:
            compressed_chunk = await Q.get()
            await f.write(b'Hello, World!')
            queue.task_done()


async def main():
    queue = asyncio.Queue()

    producer = asyncio.create_task(producer(queue, chunks))
    consumer = asyncio.create_task(consumer(queue))

    # wait for the producer to finish
    await producer

    # wait for the consumer to finish processing and cancel it
    await queue.join()
    consumer.cancel()
 
asyncio.run(main())

https://github.com/Tinche/aiofiles

Using asyncio.Queue for producer-consumer flow

【讨论】:

非常感谢@Andrew-Harelson 的帮助。看来我也可以使用 fsspec 而不是 aioFiles (filesystem-spec.readthedocs.io/en/latest/async.html) 我会对此进行调查!再次感谢!

以上是关于asyncio:只能在前一个任务达到预定义阶段时启动任务吗?的主要内容,如果未能解决你的问题,请参考以下文章

Asyncio之EventLoop笔记

9Python Asyncio异步编程-事件循环详解

8Python Asyncio异步编程-事件循环详解

单线程多任务异步抓取(asyncio)

Python 的 asyncio.gather() 似乎没有异步运行任务

异步IO框架:asyncio 中篇