在 Python FastAPI 中使用 websocket 并行发送/接收
Posted
技术标签:
【中文标题】在 Python FastAPI 中使用 websocket 并行发送/接收【英文标题】:Send / receive in parallel using websockets in Python FastAPI 【发布时间】:2021-06-12 08:38:28 【问题描述】:我将尝试用一个示例来解释我在做什么,比如我正在构建一个天气客户端。浏览器通过 websocket 发送消息,例如:
"city": "Chicago",
"country": "US"
服务器每 5 分钟查询一次天气,并用最新数据更新浏览器。
现在浏览器可以发送另一条消息,例如:
"city": "Bangalore",
"country": "IN"
现在我的服务器应该停止更新芝加哥的天气详细信息并开始更新有关班加罗尔的详细信息,即同时通过 websocket 发送/接收消息。我应该如何实施?
目前我有这个,但这只会在收到事件时更新浏览器:
@app.websocket("/ws")
async def read_webscoket(websocket: WebSocket):
await websocket.accept()
weather_client = WeatherClient(client)
while True:
data = await websocket.receive_json()
weather = await weather_client.weather(data)
await websocket.send_json(weather.dict())
如果我将websocket.receive_json()
移到循环之外,我将无法连续收听来自浏览器的消息。我想我需要启动两个 asyncio 任务,但由于我是异步编程方式的新手,所以我不太能够确定实现。
【问题讨论】:
【参考方案1】:执行此操作的最简单方法就像您提到的将阅读移到循环之外的单独任务中。在这个范例中,您需要使用最新数据更新局部变量,使您的代码看起来像这样:
@app.websocket("/ws")
async def read_webscoket(websocket: WebSocket):
await websocket.accept()
json_data = await websocket.receive_json()
async def read_from_socket(websocket: WebSocket):
nonlocal json_data
async for data in websocket.iter_json():
json_data = data
asyncio.create_task(read_from_socket(websocket))
while True:
print(f"getting weather data for json_data")
await asyncio.sleep(1) # simulate a slow call to the weather service
注意,我使用了iter_json
异步生成器,这相当于receive_json
上的无限循环。
这可行,但根据您的要求可能会有错误。想象一下,天气服务需要 10 秒才能完成,在这段时间内,用户通过套接字向不同城市发送了三个请求。在上面的代码中,您只会获得用户发送的最新城市。这对您的应用程序可能没问题,但如果您需要跟踪用户发送的所有内容,则需要使用队列。在此范例中,您将有一项任务读取数据并将其放入队列,一项任务从队列中获取数据并查询天气服务。然后,您将与 gather
同时运行这些。
@app.websocket("/wsqueue")
async def read_webscoket(websocket: WebSocket):
await websocket.accept()
queue = asyncio.queues.Queue()
async def read_from_socket(websocket: WebSocket):
async for data in websocket.iter_json():
print(f"putting data in the queue")
queue.put_nowait(data)
async def get_data_and_send():
data = await queue.get()
while True:
if queue.empty():
print(f"getting weather data for data")
await asyncio.sleep(1)
else:
data = queue.get_nowait()
print(f"Setting data to data")
await asyncio.gather(read_from_socket(websocket), get_data_and_send())
这样,您就不会丢失用户发送的数据。在上面的示例中,我只获取用户请求的最新天气数据,但您仍然可以访问所有发送的数据。
编辑:要在 cmets 中回答您的问题,队列方法可能最好在新请求进入时取消任务。基本上将您希望能够取消的长时间运行的任务转移到它自己的协程函数中(在这个例如read_and_send_to_client
) 并将其作为任务运行。当新数据进来时,如果该任务没有完成,则取消它,然后创建一个新的。
async def read_and_send_to_client(data):
print(f'reading data from client')
await asyncio.sleep(10) # simulate a slow call
print(f'finished reading data, sending to websocket client')
@app.websocket("/wsqueue")
async def read_webscoket(websocket: WebSocket):
await websocket.accept()
queue = asyncio.queues.Queue()
async def read_from_socket(websocket: WebSocket):
async for data in websocket.iter_json():
print(f"putting data in the queue")
queue.put_nowait(data)
async def get_data_and_send():
data = await queue.get()
fetch_task = asyncio.create_task(read_and_send_to_client(data))
while True:
data = await queue.get()
if not fetch_task.done():
print(f'Got new data while task not complete, canceling.')
fetch_task.cancel()
fetch_task = asyncio.create_task(read_and_send_to_client(data))
await asyncio.gather(read_from_socket(websocket), get_data_and_send())
【讨论】:
太棒了!这几乎正是我所寻找的,而且我是如此接近。然而,还有一件事,比如我们发回数据并在发送更新数据之前休眠 10 秒,在这之间,我们会收到一条消息。现在会发生的是我们仍然需要等待睡眠结束。我如何在收到新消息后立即在该线程上调用notify
或 cancel
唤醒?我也不知道只做create_task
就足够了,而且我们不需要在它上面做asyncio.gather
。我需要阅读更多关于asyncio
的信息。谢谢!
@mohitmayank 您需要创建一个要取消的长操作任务,如果未完成则调用取消。更新了答案以显示如何执行此操作。以上是关于在 Python FastAPI 中使用 websocket 并行发送/接收的主要内容,如果未能解决你的问题,请参考以下文章
FastAPI: 极速开发Python Web应用的未来之星