调用`async for in`后如何获得异步生成器的下一次迭代
Posted
技术标签:
【中文标题】调用`async for in`后如何获得异步生成器的下一次迭代【英文标题】:How to get next itereration of async generator after calling `async for in` 【发布时间】:2020-07-17 21:40:37 【问题描述】:使用 FastAPI,我试图检测 StreamingResponse 是否已完全被客户端使用,或者是否已被取消。
我有以下示例应用程序:
import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def ainfinite_generator():
while True:
yield b"some fake data "
await asyncio.sleep(.001)
async def astreamer(generator):
try:
async for data in generator:
yield data
except Exception as e:
# this isn't triggered by a cancelled request
print(e)
finally:
# this always throws a StopAsyncIteration exception
# no matter whether the generator was consumed or not
leftover = await generator.__anext__()
if leftover:
print("we didn't finish")
else:
print("we finished")
@app.get("/")
async def infinite_stream():
return StreamingResponse(astreamer(ainfinite_generator()))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
似乎astreamer
中的第一个async for in generator
“消耗”了异步生成器。在该循环之后,进一步尝试进行下一次迭代失败并出现StopAsyncIteration
异常,即使生成器是上面定义的“无限”。
我查看了PEP-525,我唯一看到的是,如果将异常抛出到生成器中,它将导致任何进一步尝试从生成器中读取以抛出 StopAsyncIteration 异常,但我没有看看会发生什么。至少,我在 Starlette 的 StreamingResponse class 中没有看到这一点(它似乎与“内容”没有多大关系)。执行async for in gen
后生成器不会“释放”吗?
【问题讨论】:
您的astreamer
是如何离开async for
的?
好问题。我不完全确定。看起来它可能会在这里被此代码取消:github.com/encode/starlette/blob/master/starlette/… 之后它会在这里被取消:github.com/encode/starlette/blob/…
在较新的 Python 中,CancelledError
不再继承自 Exception
,而是继承自 BaseException
,这就是您在 except
子句中看不到它的原因。由于注入生成器的异常有效地离开了while
循环,因此生成器无法继续执行...
嗯...我不知道 CancelledError ,这完美地解释了为什么我无法赶上取消。那肯定是一种更清洁的方法。我不确定我是否完全遵循 while 循环的流程,除非该异常实际上被注入到生成器中。我不确定那会发生在哪里。
CancelledError
被注入到任何被取消的协程中,只是在它等待某事的地方(例如 sleep(.001)
在你的情况下)。所以“注入”只是意味着,当有人取消一个协程时,它被暂停的await
只是神奇地(来自协程的 POV)恢复并引发了一个CancelledError
。此错误会传播到您的使用者,并且这样的生成器无法继续,因为它已完成执行,它引发了异常。
【参考方案1】:
下面的代码展示了如何监视协程(在我的例子中是异步生成器)上的取消。如 cmets 中所述,如果取消异步生成器,它会将异常注入生成器,从那时起,任何尝试获取生成器中的下一个项目都会引发 StopAsyncIteration
异常。见PEP 525。要确定异步生成器是否被取消,只需尝试/排除asyncio.CancelledError
异常(源自BaseException
)。
这里还有一些代码用于展示如何处理普通的生成器,这些代码更宽容一些。如果您保持相同的 try/except 流程,如果它们被取消,则会引发 GeneratorExit
异常。
棘手的部分是这些异常中的大多数都派生自 BaseException
类,这与我期待的 StopIteration
异常不同,后者派生自 Exception
类。
顺便说一句,实际取消发生在starlette。
import asyncio
import time
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
def infinite_generator():
# not blocking, so doesn't need to be async
# but if it was blocking, you could make this async and await it
while True:
yield b"some fake data "
def finite_generator():
# not blocking, so doesn't need to be async
# but if it was blocking, you could make this async and await it
x = 0
while x < 10000:
yield f"x"
x += 1
async def astreamer(generator):
try:
# if it was an async generator we'd do:
# "async for data in generator:"
# (there is no yield from async_generator)
for i in generator:
yield i
await asyncio.sleep(.001)
except asyncio.CancelledError as e:
print('cancelled')
def streamer(generator):
try:
# note: normally we would do "yield from generator"
# but that won't work with next(generator) in the finally statement
for i in generator:
yield i
time.sleep(.001)
except GeneratorExit:
print("cancelled")
finally:
# showing that we can check here to see if all data was consumed
# the except statement above effectively does the same thing
try:
next(generator)
print("we didn't finish")
return
except StopIteration:
print("we finished")
@app.get("/infinite")
async def infinite_stream():
return StreamingResponse(streamer(infinite_generator()))
@app.get("/finite")
async def finite_stream():
return StreamingResponse(streamer(finite_generator()))
@app.get("/ainfinite")
async def infinite_stream():
return StreamingResponse(astreamer(infinite_generator()))
@app.get("/afinite")
async def finite_stream():
return StreamingResponse(astreamer(finite_generator()))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
【讨论】:
以上是关于调用`async for in`后如何获得异步生成器的下一次迭代的主要内容,如果未能解决你的问题,请参考以下文章