在Python中附加到合并的异步生成器
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在Python中附加到合并的异步生成器相关的知识,希望对你有一定的参考价值。
我正在尝试在Python 3.7中合并一堆异步生成器,同时仍在迭代中添加新的异步生成器。我目前正在使用aiostream
合并我的生成器:
from asyncio import sleep, run
from aiostream.stream import merge
async def go():
yield 0
await sleep(1)
yield 50
await sleep(1)
yield 100
async def main():
tasks = merge(go(), go(), go())
async for v in tasks:
print(v)
if __name__ == '__main__':
run(main())
但是,一旦循环开始,我必须能够继续添加正在运行的任务。有点像。
from asyncio import sleep, run
from aiostream.stream import merge
async def go():
yield 0
await sleep(1)
yield 50
await sleep(1)
yield 100
async def main():
tasks = merge(go(), go(), go())
async for v in tasks:
if v == 50:
tasks.merge(go())
print(v)
if __name__ == '__main__':
run(main())
我最接近的是使用aiostream
库,但也许也可以仅使用本机asyncio
标准库来编写得很整齐。
答案
这里是一个即使有大量异步迭代器也应有效工作的实现:
class merge:
def __init__(self, *iterables):
self._iterables = list(iterables)
self._wakeup = asyncio.Event()
def _add_iters(self, next_futs, on_done):
for it in self._iterables:
it = it.__aiter__()
nfut = asyncio.ensure_future(it.__anext__())
nfut.add_done_callback(on_done)
next_futs[nfut] = it
del self._iterables[:]
return next_futs
async def __aiter__(self):
done = {}
next_futs = {}
def on_done(nfut):
done[nfut] = next_futs.pop(nfut)
self._wakeup.set()
self._add_iters(next_futs, on_done)
try:
while next_futs:
await self._wakeup.wait()
self._wakeup.clear()
for nfut, it in done.items():
try:
ret = nfut.result()
except StopAsyncIteration:
continue
self._iterables.append(it)
yield ret
done.clear()
if self._iterables:
self._add_iters(next_futs, on_done)
finally:
# if the generator exits with an exception, or if the caller stops
# iterating, make sure our callbacks are removed
for nfut in next_futs:
nfut.remove_done_callback(on_done)
def append_iter(self, new_iter):
self._iterables.append(new_iter)
self._wakeup.set()
示例代码所需的唯一更改是该方法的名称为append_iter
,而不是merge
。
另一答案
这可以通过将stream.flatten与异步队列一起使用来存储新的生成器来完成。
import asyncio
from aiostream import stream
async def main():
queue = asyncio.Queue()
xs = stream.iterate(from_queue(queue))
ys = stream.flatten(xs, task_limit=5)
await queue.put(go())
await queue.put(go())
await queue.put(go())
async with ys.stream() as streamer:
async for item in streamer:
if item == 50:
await queue.put(go())
print(item)
请注意,您需要一个小的包装器才能将asyncio队列转换为可以迭代的内容
async def from_queue(queue):
while True:
yield await queue.get()
以上是关于在Python中附加到合并的异步生成器的主要内容,如果未能解决你的问题,请参考以下文章
嵌套字典。合并公共键并将值附加到列表中。 0 值未附加。里面的代码