在Python中附加到合并的异步生成器

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在Python中附加到合并的异步生成器相关的知识,希望对你有一定的参考价值。

我正在尝试在Python 3.7中合并一堆异步生成器,同时仍在迭代中添加新的异步生成器。我目前正在使用aiostream合并我的生成器:

from asyncio import sleep, run
from aiostream.stream import merge

async def go():
    yield 0
    await sleep(1)
    yield 50
    await sleep(1)
    yield 100

async def main():
    tasks = merge(go(), go(), go())

    async for v in tasks:
        print(v)

if __name__ == '__main__':
    run(main())

但是,一旦循环开始,我必须能够继续添加正在运行的任务。有点像。

from asyncio import sleep, run
from aiostream.stream import merge

async def go():
    yield 0
    await sleep(1)
    yield 50
    await sleep(1)
    yield 100

async def main():
    tasks = merge(go(), go(), go())

    async for v in tasks:
        if v == 50:
            tasks.merge(go())
        print(v)

if __name__ == '__main__':
    run(main())

我最接近的是使用aiostream库,但也许也可以仅使用本机asyncio标准库来编写得很整齐。

答案

这里是一个即使有大量异步迭代器也应有效工作的实现:

class merge:
    def __init__(self, *iterables):
        self._iterables = list(iterables)
        self._wakeup = asyncio.Event()

    def _add_iters(self, next_futs, on_done):
        for it in self._iterables:
            it = it.__aiter__()
            nfut = asyncio.ensure_future(it.__anext__())
            nfut.add_done_callback(on_done)
            next_futs[nfut] = it
        del self._iterables[:]
        return next_futs

    async def __aiter__(self):
        done = {}
        next_futs = {}
        def on_done(nfut):
            done[nfut] = next_futs.pop(nfut)
            self._wakeup.set()

        self._add_iters(next_futs, on_done)
        try:
            while next_futs:
                await self._wakeup.wait()
                self._wakeup.clear()
                for nfut, it in done.items():
                    try:
                        ret = nfut.result()
                    except StopAsyncIteration:
                        continue
                    self._iterables.append(it)
                    yield ret
                done.clear()
                if self._iterables:
                    self._add_iters(next_futs, on_done)
        finally:
            # if the generator exits with an exception, or if the caller stops
            # iterating, make sure our callbacks are removed
            for nfut in next_futs:
                nfut.remove_done_callback(on_done)

    def append_iter(self, new_iter):
        self._iterables.append(new_iter)
        self._wakeup.set()

示例代码所需的唯一更改是该方法的名称为append_iter,而不是merge

另一答案

这可以通过将stream.flatten与异步队列一起使用来存储新的生成器来完成。

import asyncio
from aiostream import stream

async def main():
    queue = asyncio.Queue()
    xs = stream.iterate(from_queue(queue))
    ys = stream.flatten(xs, task_limit=5)

    await queue.put(go())
    await queue.put(go())
    await queue.put(go())

    async with ys.stream() as streamer:
        async for item in streamer:
            if item == 50:
                await queue.put(go())
            print(item)

请注意,您需要一个小的包装器才能将asyncio队列转换为可以迭代的内容

async def from_queue(queue):
    while True:
        yield await queue.get()

以上是关于在Python中附加到合并的异步生成器的主要内容,如果未能解决你的问题,请参考以下文章

片段未附加到上下文 - 延迟的 UI 更改

嵌套字典。合并公共键并将值附加到列表中。 0 值未附加。里面的代码

java 简单的代码片段,展示如何将javaagent附加到运行JVM进程

Python代码阅读(第19篇):合并多个字典

片段未附加到上下文

Android在活动视图中附加片段获取片段已添加错误