如何在调用更新后端状态的函数时从 python (fastapi) 发送服务器端事件
Posted
技术标签:
【中文标题】如何在调用更新后端状态的函数时从 python (fastapi) 发送服务器端事件【英文标题】:How to send server-side events from python (fastapi) upon calls to a function that updates the backend state 【发布时间】:2019-11-16 21:44:04 【问题描述】:我有以下问题:给定一个运行 fastapi 的后端,它有一个流式端点,用于更新前端,我想在每次调用更新后端状态的函数时发送这些更新(可以是由计划的作业或被击中并导致状态更新的不同端点)。
我想要实现的一个简单版本是:
from fastapi import FastAPI
from starlette.responses import StreamingResponse
class State:
def __init__(self):
self.messages = []
def update(self, new_messages):
self.messages = new_messages
# HERE: notify waiting stream endpoint
app = FastAPI()
state = State()
@app.get('/stream')
def stream():
def event_stream():
while True:
# HERE lies the question: wait for state to be update
for message in state.messages:
yield 'data: \n\n'.format(json.dumps(message))
return StreamingResponse(event_stream(), media_type="text/event-stream")
我希望它永远运行下去。每次状态更新时,event_stream
都会解除阻塞并发送消息。
我看过线程和异步,但我觉得我缺少一些关于如何在 python 中执行此操作的简单概念。
【问题讨论】:
【参考方案1】:FastAPI 基于 Starlette,Server-Sent Events 插件可用于 Starlette
import asyncio
import uvicorn
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
MESSAGE_STREAM_DELAY = 1 # second
MESSAGE_STREAM_RETRY_TIMEOUT = 15000 # milisecond
app = FastAPI()
@app.get('/stream')
async def message_stream(request: Request):
def new_messages(): ...
async def event_generator():
while True:
# If client was closed the connection
if await request.is_disconnected():
break
# Checks for new messages and return them to client if any
if new_messages():
yield
"event": "new_message",
"id": "message_id",
"retry": MESSAGE_STREAM_RETRY_TIMEOUT,
"data": "message_content"
await asyncio.sleep(MESSAGE_STREAM_DELAY)
return EventSourceResponse(event_generator())
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)
【讨论】:
【参考方案2】:我能找到解决此问题的最简单方法是使用threading.Condition
。
因此变成了:
import threading
from fastapi import FastAPI
from starlette.responses import StreamingResponse
condition = threading.Condition()
class State:
def __init__(self):
self.messages = []
def update(self, new_messages):
self.messages = new_messages
with condition:
condition.notify()
app = FastAPI()
state = State()
@app.get('/stream')
def stream():
def event_stream():
while True:
with condition:
condition.wait()
for message in state.messages:
yield 'data: \n\n'.format(json.dumps(message))
return StreamingResponse(event_stream(), media_type="text/event-stream")
【讨论】:
如果update
在event_stream
注意到新消息之前被调用两次,此代码会丢失消息吗?看看queue 作为条件变量的更高级替代方案。 update
可以将消息放入队列,event_stream
将它们从队列中弹出。您会得到与现在相同的行为,但不会丢失消息并且代码更容易推理。以上是关于如何在调用更新后端状态的函数时从 python (fastapi) 发送服务器端事件的主要内容,如果未能解决你的问题,请参考以下文章
Boost python,嵌入时从python调用c++函数