深入Asyncio异步迭代器
Posted ikct2017
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入Asyncio异步迭代器相关的知识,希望对你有一定的参考价值。
Async Iterators: async for
除了async def
和await
语法外,还有一些其它的语法,本章学习异步版的for循环与迭代器,不难理解,普通迭代器是通过__iter__
和__next__
两个特殊方法实现的,如下例。
>>> class A:
... def __iter__(self): # 1
... self.x = 0 # 2
... return self # 3
... def __next__(self): # 4
... if self.x > 2:
... raise StopIteration
... else:
... self.x += 1
... return self.x
>>> for i in A():
... print(i)
...
1
2
3
迭代器必须支持__iter__方法;
值初始化;
返回一个可迭代对象,这个对象可以执行__next__方法,这里A本身就实现了__next__方法,所以返回它本身即可;
在每次迭代时调用。
将__next__()
方法声明为异步函数,要允许await
某些IO操作,除开命名的差别外,几乎是相同的定义,PEP 492中规范说明了如何实现一个异步迭代器:
1. 实现__aiter__()
方法(不需要用async def);
2. __aiter__()
方法必须返回一个支持__anext__()
方法的对象;
3. __anext__()
必须返回迭代器的每个值,并在结束迭代时抛出StopAsyncIteration
异常。
举个例子,比如Redis的key对应的value是一个很大的集合,想迭代这些key的value会出现严重的网络IO,异步迭代器可以这样实现:
import asyncio
from aioredis import create_redis
async def main(): # 1
redis = await create_redis((‘localhost‘, 6379)) # 2
keys = [‘America‘, ‘Africa‘, ‘Europe‘, ‘Asia‘] # 3
async for value in OneAtTime(redis, keys): # 4
await process(value) # 5
class OneAtTime:
def __init__(self, redis, keys): # 6
self.redis = redis
self.keys = keys
def __aiter__(self): # 7
self.ikeys = iter(self.keys)
return self
async def __anext__(self): # 8
try:
k = next(self.ikeys) # 9
except StopIteration: # 10
raise StopAsyncIteration
value = await redis.get(k) # 11
return value
asyncio.get_event_loop().run_until_complete(main())
主程序入口,用于在loop.run_until_complete()方法中调用;
使用aioredis库获取异步连接;
假设每个key对应的value实例非常大;
使用async for循环,关键点是这里的迭代器可以在等待数据时切换任务;
在得到返回值后用协程去处理这个值,假设这个函数也是IO绑定的;
用一个实例来存储redis连接和keys表;
像普通迭代器一样,初始化一些值,这里我们创建一个迭代器作值,由于这个类也重载了__anext__方法,所以直接返回自身;
__anext__
方法用async def声明;迭代这个普通的迭代器;
处理普通异常并重新抛出一个异步异常;
这个调用是网络IO,因此用await切换它。
通过以上实现,将可以用一个异步for循环来迭代一些处理网络IO的异步迭代器,好处是可以只用一个事件loop就能处理大量的数据。
以上是关于深入Asyncio异步迭代器的主要内容,如果未能解决你的问题,请参考以下文章
自己手写调度器,理解Python中的asyncio异步事件循环与协程
自己手写调度器,理解Python中的asyncio异步事件循环与协程