调用`async for in`后如何获得异步生成器的下一次迭代

Posted

技术标签:

【中文标题】调用`async for in`后如何获得异步生成器的下一次迭代【英文标题】:How to get next itereration of async generator after calling `async for in` 【发布时间】:2020-07-17 21:40:37 【问题描述】:

使用 FastAPI,我试图检测 StreamingResponse 是否已完全被客户端使用,或者是否已被取消。

我有以下示例应用程序:

import asyncio

import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


async def ainfinite_generator():
    while True:
        yield b"some fake data "
        await asyncio.sleep(.001)


async def astreamer(generator):
    try:
        async for data in generator:
            yield data
    except Exception as e:
        # this isn't triggered by a cancelled request
        print(e)
    finally:
        # this always throws a StopAsyncIteration exception
        # no matter whether the generator was consumed or not
        leftover = await generator.__anext__()
        if leftover:
            print("we didn't finish")
        else:
            print("we finished")


@app.get("/")
async def infinite_stream():
    return StreamingResponse(astreamer(ainfinite_generator()))


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

似乎astreamer 中的第一个async for in generator “消耗”了异步生成器。在该循环之后,进一步尝试进行下一次迭代失败并出现StopAsyncIteration 异常,即使生成器是上面定义的“无限”。

我查看了PEP-525,我唯一看到的是,如果将异常抛出到生成器中,它将导致任何进一步尝试从生成器中读取以抛出 StopAsyncIteration 异常,但我没有看看会发生什么。至少,我在 Starlette 的 StreamingResponse class 中没有看到这一点(它似乎与“内容”没有多大关系)。执行async for in gen 后生成器不会“释放”吗?

【问题讨论】:

您的astreamer 是如何离开async for 的? 好问题。我不完全确定。看起来它可能会在这里被此代码取消:github.com/encode/starlette/blob/master/starlette/… 之后它会在这里被取消:github.com/encode/starlette/blob/… 在较新的 Python 中,CancelledError 不再继承自 Exception,而是继承自 BaseException,这就是您在 except 子句中看不到它的原因。由于注入生成器的异常有效地离开了while 循环,因此生成器无法继续执行... 嗯...我不知道 CancelledError ,这完美地解释了为什么我无法赶上取消。那肯定是一种更清洁的方法。我不确定我是否完全遵循 while 循环的流程,除非该异常实际上被注入到生成器中。我不确定那会发生在哪里。 CancelledError 被注入到任何被取消的协程中,只是在它等待某事的地方(例如 sleep(.001) 在你的情况下)。所以“注入”只是意味着,当有人取消一个协程时,它被暂停的await 只是神奇地(来自协程的 POV)恢复并引发了一个CancelledError。此错误会传播到您的使用者,并且这样的生成器无法继续,因为它已完成执行,它引发了异常。 【参考方案1】:

下面的代码展示了如何监视协程(在我的例子中是异步生成器)上的取消。如 cmets 中所述,如果取消异步生成器,它会将异常注入生成器,从那时起,任何尝试获取生成器中的下一个项目都会引发 StopAsyncIteration 异常。见PEP 525。要确定异步生成器是否被取消,只需尝试/排除asyncio.CancelledError 异常(源自BaseException)。

这里还有一些代码用于展示如何处理普通的生成器,这些代码更宽容一些。如果您保持相同的 try/except 流程,如果它们被取消,则会引发 GeneratorExit 异常。

棘手的部分是这些异常中的大多数都派生自 BaseException 类,这与我期待的 StopIteration 异常不同,后者派生自 Exception 类。

顺便说一句,实际取消发生在starlette。

import asyncio
import time

import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


def infinite_generator():
    # not blocking, so doesn't need to be async
    # but if it was blocking, you could make this async and await it
    while True:
        yield b"some fake data "


def finite_generator():
    # not blocking, so doesn't need to be async
    # but if it was blocking, you could make this async and await it
    x = 0
    while x < 10000:
        yield f"x"
        x += 1


async def astreamer(generator):
    try:
        # if it was an async generator we'd do:
        # "async for data in generator:"
        # (there is no yield from async_generator)
        for i in generator:
            yield i
            await asyncio.sleep(.001)

    except asyncio.CancelledError as e:
        print('cancelled')


def streamer(generator):
    try:
        # note: normally we would do "yield from generator"
        # but that won't work with next(generator) in the finally statement
        for i in generator:
            yield i
            time.sleep(.001)

    except GeneratorExit:
        print("cancelled")
    finally:
        # showing that we can check here to see if all data was consumed
        # the except statement above effectively does the same thing
        try:
            next(generator)
            print("we didn't finish")
            return
        except StopIteration:
            print("we finished")


@app.get("/infinite")
async def infinite_stream():
    return StreamingResponse(streamer(infinite_generator()))


@app.get("/finite")
async def finite_stream():
    return StreamingResponse(streamer(finite_generator()))


@app.get("/ainfinite")
async def infinite_stream():
    return StreamingResponse(astreamer(infinite_generator()))


@app.get("/afinite")
async def finite_stream():
    return StreamingResponse(astreamer(finite_generator()))


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

【讨论】:

以上是关于调用`async for in`后如何获得异步生成器的下一次迭代的主要内容,如果未能解决你的问题,请参考以下文章

springboot+async异步接口实现和调用

nodejs之async异步编程

springboot 的异步调用 @Async注解

Spring Boot2.0之@Async实现异步调用

C#:异步编程中的 async 和 await

如何在 Cloud Functions for Firebase 中使 HTTP 请求异步/等待?