用于组合异步迭代器的映射、过滤器和迭代工具

Posted

技术标签:

【中文标题】用于组合异步迭代器的映射、过滤器和迭代工具【英文标题】:Map, filter and itertools for composing asynchronous iterators 【发布时间】:2018-05-31 12:19:32 【问题描述】:

Python 是否支持异步迭代器上的函数式操作?我知道我可以使用mapfilteritertools 来懒惰地转换和使用来自普通生成器的数据:

from itertools import accumulate, takewhile

def generator():
    a, b = 1, 1
    while True:
        yield a
        a, b = b, a + b

# create another iterator, no computation is started yet:
another_iterator = takewhile(lambda x: x < 100, accumulate(generator()))
# start consuming data:
print(list(another_iterator))
# [1, 2, 4, 7, 12, 20, 33, 54, 88]

现在,Python 3.6 的异步生成器/迭代器不支持同样的事情,因为它们当然没有实现正常的迭代器协议:

async def agenerator():
    a, b = 1, 1
    while True:
        yield a
        a, b = b, a + b

accumulate(agenerator())

TypeError: 'async_generator' 对象不可迭代

是否有某种异步映射或异步迭代工具来实现 Python 3.6/3.7 中类似的惰性行为?

【问题讨论】:

【参考方案1】:

我看到的最完整的 itertools 异步版本是 aiostream 模块。您的示例将是:

import asyncio
from aiostream.stream import takewhile, accumulate, list as alist


async def agenerator():
    a, b = 1, 1
    while True:
        yield a
        a, b = b, a + b


async def main():
    another_iterator = takewhile(
        accumulate(agenerator()),
        lambda x: x < 100, 
    )

    res = await alist(another_iterator)

    print(res)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

【讨论】:

谢谢,这看起来很有趣,尤其是 list 的异步版本。我正在研究图书馆,虽然它是 0.3.0,但它似乎相当成熟。

以上是关于用于组合异步迭代器的映射、过滤器和迭代工具的主要内容,如果未能解决你的问题,请参考以下文章

python模块分析之itertools

提高python迭代器的速度

Python过滤器/最大组合 - 检查空迭代器

在 Rust 中迭代、过滤和映射数据从一个结构到另一个结构

ES6 数组map(映射)reduce(汇总)filter(过滤器)forEach(循环迭代)

itertools常用函数