asyncio:只能在前一个任务达到预定义阶段时启动任务吗?
Posted
技术标签:
【中文标题】asyncio:只能在前一个任务达到预定义阶段时启动任务吗?【英文标题】:asyncio: can a task only start when previous task reach a pre-defined stage? 【发布时间】:2021-11-25 17:57:50 【问题描述】:我从 asyncio 开始,希望应用于以下问题:
数据被分割成块。 第一次压缩块。 然后将压缩后的chunk写入文件中。 单个文件用于所有块,所以我需要一个一个地处理它们。with open('my_file', 'w+b') as f:
for chunk in chunks:
compress_chunk(ch)
f.write(ch)
从这个上下文来看,为了更快地运行这个过程,一旦当前迭代的write
步骤开始,是否也可以触发下一次迭代的compress
步骤?
我可以用asyncio
做到这一点,保持类似的for
循环结构吗?如果是的话,你能分享一些关于这方面的建议吗?
我猜另一种并行运行的方法是使用ProcessPoolExecutor
并将compress
阶段与write
阶段完全分开。这意味着在不同的执行程序中压缩第一个所有块。
只有当所有块都被压缩后,才开始写入步骤。
但如果有意义的话,我想用asyncio
1st 研究第一种方法。
提前感谢您的帮助。 最好的
【问题讨论】:
也许是答案的开始:***.com/questions/69331788/synchronize-asyncio-queue 【参考方案1】:您可以使用生产者-消费者模型来做到这一点。只要有一个生产者和一个消费者,你就会有正确的顺序。对于您的用例,这就是您将从中受益的全部。此外,您应该使用aioFiles
库。标准文件 IO 将主要阻塞您的主压缩/生产者线程,您不会看到太多加速。试试这样的:
async def produce(queue, chunks):
for chunk in chunks:
compress_chunk(ch)
await queue.put(i)
async def consume(queue):
with async with aiofiles.open('my_file', 'w') as f:
while True:
compressed_chunk = await Q.get()
await f.write(b'Hello, World!')
queue.task_done()
async def main():
queue = asyncio.Queue()
producer = asyncio.create_task(producer(queue, chunks))
consumer = asyncio.create_task(consumer(queue))
# wait for the producer to finish
await producer
# wait for the consumer to finish processing and cancel it
await queue.join()
consumer.cancel()
asyncio.run(main())
https://github.com/Tinche/aiofiles
Using asyncio.Queue for producer-consumer flow
【讨论】:
非常感谢@Andrew-Harelson 的帮助。看来我也可以使用 fsspec 而不是 aioFiles (filesystem-spec.readthedocs.io/en/latest/async.html) 我会对此进行调查!再次感谢!以上是关于asyncio:只能在前一个任务达到预定义阶段时启动任务吗?的主要内容,如果未能解决你的问题,请参考以下文章