如何在调用更新后端状态的函数时从 python (fastapi) 发送服务器端事件

Posted

技术标签:

【中文标题】如何在调用更新后端状态的函数时从 python (fastapi) 发送服务器端事件【英文标题】:How to send server-side events from python (fastapi) upon calls to a function that updates the backend state 【发布时间】:2019-11-16 21:44:04 【问题描述】:

我有以下问题:给定一个运行 fastapi 的后端,它有一个流式端点,用于更新前端,我想在每次调用更新后端状态的函数时发送这些更新(可以是由计划的作业或被击中并导致状态更新的不同端点)。

我想要实现的一个简单版本是:

from fastapi import FastAPI
from starlette.responses import StreamingResponse

class State:
    def __init__(self):
        self.messages = []

    def update(self, new_messages):
        self.messages = new_messages
        # HERE: notify waiting stream endpoint

app = FastAPI()

state = State()

@app.get('/stream')
def stream():
    def event_stream():
        while True:
            # HERE lies the question: wait for state to be update
            for message in state.messages:
                yield 'data: \n\n'.format(json.dumps(message))
    return StreamingResponse(event_stream(), media_type="text/event-stream")

我希望它永远运行下去。每次状态更新时,event_stream 都会解除阻塞并发送消息。

我看过线程和异步,但我觉得我缺少一些关于如何在 python 中执行此操作的简单概念。

【问题讨论】:

【参考方案1】:

FastAPI 基于 Starlette,Server-Sent Events 插件可用于 Starlette

import asyncio
import uvicorn
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse

MESSAGE_STREAM_DELAY = 1  # second
MESSAGE_STREAM_RETRY_TIMEOUT = 15000  # milisecond
app = FastAPI()


@app.get('/stream')
async def message_stream(request: Request):
    def new_messages(): ...
    async def event_generator():
        while True:
            # If client was closed the connection
            if await request.is_disconnected():
                break

            # Checks for new messages and return them to client if any
            if new_messages():
                yield 
                        "event": "new_message",
                        "id": "message_id",
                        "retry": MESSAGE_STREAM_RETRY_TIMEOUT,
                        "data": "message_content"
                

            await asyncio.sleep(MESSAGE_STREAM_DELAY)

    return EventSourceResponse(event_generator())


if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8000)

【讨论】:

【参考方案2】:

我能找到解决此问题的最简单方法是使用threading.Condition

因此变成了:

import threading

from fastapi import FastAPI
from starlette.responses import StreamingResponse

condition = threading.Condition()

class State:
    def __init__(self):
        self.messages = []

    def update(self, new_messages):
        self.messages = new_messages
        with condition:
            condition.notify()

app = FastAPI()

state = State()

@app.get('/stream')
def stream():
    def event_stream():
        while True:
            with condition:
                condition.wait()

            for message in state.messages:
                yield 'data: \n\n'.format(json.dumps(message))
    return StreamingResponse(event_stream(), media_type="text/event-stream")




【讨论】:

如果updateevent_stream 注意到新消息之前被调用两次,此代码会丢失消息吗?看看queue 作为条件变量的更高级替代方案。 update 可以将消息放入队列,event_stream 将它们从队列中弹出。您会得到与现在相同的行为,但不会丢失消息并且代码更容易推理。

以上是关于如何在调用更新后端状态的函数时从 python (fastapi) 发送服务器端事件的主要内容,如果未能解决你的问题,请参考以下文章

Boost python,嵌入时从python调用c++函数

在另一个函数中调用列表 - Python [重复]

如何在创建对象时从 QML 调用任意 C++ 函数?

如何在 ajax 调用时从 laravel 控制器函数返回视图?

如何确保更新 reactjs 状态,然后调用函数?

如何访问已调用 setState 的函数主体中的更新状态?