减少 Asyncio 中的延迟
Posted
技术标签:
【中文标题】减少 Asyncio 中的延迟【英文标题】:Reduce latency in Asyncio 【发布时间】:2020-10-01 13:27:14 【问题描述】:我有一个使用 FastAPI 的 websocket 服务器启动并运行良好。
但是,当我使用那些“等待”时,我遇到了一些延迟问题。起初,我认为这与互联网连接或我的 linux 服务器有关。但似乎 asyncio 正在等待其他任务。
这是我的代码:
import asyncio
from pydantic import BaseModel
class UserClientWebSocket(BaseModel):
id: str
ws: WebSocket
class Config:
arbitrary_types_allowed = True
class ConnectionManager:
def __init__(self):
self.active_user_client_connections: List[UserClientWebSocket] = []
self.collect_user_IDs = []
async def connect_the_user_client(self, websocket: WebSocket, THE_USER_ID):
await websocket.accept()
await self.send_message_to_absolutely_everybody(f"User: THE_USER_ID connected to server!")
print("User: ".format(THE_USER_ID) + " Connected")
if THE_USER_ID not in self.collect_user_IDs:
self.collect_user_IDs.append(THE_USER_ID)
else:
await self.send_message_to_absolutely_everybody(f"Somebody connected with the same ID as client: THE_USER_ID")
await self.send_message_to_absolutely_everybody("but Vlori is a nice and kind guy, so he wil not get kicked :)")
self.collect_user_IDs.append(THE_USER_ID)
self.active_user_client_connections.append(UserClientWebSocket(ws=websocket, id=THE_USER_ID))
await self.show_number_of_clients()
async def function_one_send_message_to_absolutely_everybody(self, message: str):
try:
await asyncio.sleep(2)
await asyncio.gather(*(conn.ws.send_text(message) for conn in self.active_webpage_client_connections))
await asyncio.gather(*(conn.ws.send_text(message) for conn in self.active_user_client_connections))
except:
print("waiting")
async def function_two_send_personal_message_to_user(self, message: str, websocket: WebSocket):
try:
await websocket.send_text(message)
except:
print("waiting for task..")
... ... ... ... ...
再往下是客户端连接的通道:
@app.websocket("/ws/testchannel")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
print_result = print("Received data: ".format(data))
send_data_back_to_user = await websocket.send_text(f"you sent message: data")
except WebSocketDisconnect as e:
print("client left chat, error = ", e)
现在的代码可以完美运行,性能也不错!但是,如果我在“send_data_back_to_user”行下添加一个异步 def 函数,例如:
await connection_manager.function_one_send_message_to_absolutely_everybody(data)
然后有一个巨大的延迟!这是为什么呢?
我正在玩耍并尝试了这个:
@app.websocket("/ws/testchannel")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
print_result = print("Received data: ".format(data))
send_data_back_to_user = await websocket.send_text(f"you sent message: data") # if i comment this line and the line underneath, and the speed is extremely fast!
the_asyncio_loop = asyncio.get_event_loop()
print_data_on_terminal = asyncio.gather(print_result)
return_data_back_to_user = asyncio.gather(send_data_back_to_user)
broadcast_msg = asyncio.gather(connection_manager.function_one_send_message_to_absolutely_everybody(data))
run_all_loops_together = asyncio.gather(print_data_on_terminal, return_data_back_to_user, broadcast_msg)
results = the_asyncio_loop.run_until_complete(run_all_loops_together)
print(results)
except WebSocketDisconnect as e:
print("client left chat, error = ", e)
但给了我错误:
TypeError: An asyncio.Future, a coroutine or an awaitable is required
有人可以帮我解决这个问题吗?
谢谢。
【问题讨论】:
await
让其他任务有时间在同一个线程上运行。如果您在 await
发生后注意到很多延迟,则您可能试图在 asyncio 正在使用的线程上做太多工作。
但是有解决办法吗?诸如 async.gather 之类的东西?如果不是,那么 asyncio 的意义何在?
我想我可以,但你的代码可能需要大量重构才能使用asyncio.Queue
。目前正在努力 - 拿着我的啤酒。
Asyncio 用于在单个线程上一次等待多个 IO 资源。
在这个阶段,低延迟对我的 websocket 和项目来说非常重要,如果绝对没有可能(我认为是这样),减少这个 asyncio.gather() 或 asyncio.future (),那么我可能需要考虑使用其他东西..
【参考方案1】:
如果我误解了你的任何解释,请告诉我。
据我了解,您在等待关注时在 websocket_endpoint
上遇到了一些延迟。
async def function_one_send_message_to_absolutely_everybody(self, message: str):
try:
await asyncio.sleep(2)
await asyncio.gather(*(conn.ws.send_text(message) for conn in self.active_webpage_client_connections))
await asyncio.gather(*(conn.ws.send_text(message) for conn in self.active_user_client_connections))
except:
print("waiting")
您正在等待asycnio.sleep
。这实际上会使您的 websocket_endpoint
从开始延迟 2 秒。
await
表示阻止当前执行直到awaitable
完成,并且安排 执行该awaitable
- 在那之前,将事件循环 控制权交还给main 并允许执行其他awaitable
。 Asyncio
是不平行,python 不要只是跳过 await something
并移动到下一行 - 它阻塞直到它完成。
要解决此问题,请先删除 await asyncio.sleep(2)
。但还有一个可能的问题:asyncio.gather
。
IO 操作很慢,您正在等待消息发送到所有现有连接。这也增加了websocket_endpoint
中主循环的延迟。
抽象地说,没有那个asyncio.sleep(2)
,你的代码就是这样做的:
# assume there is 101 connections including main
async def io_simulation():
await asyncio.sleep(0.1) # lets say net delay is 100ms
return 'Message'
async def sender():
while data := await io_simulation(): # receive data
# send message
await io_simulation()
# send to 100 users
await asyncio.gather(*(io_simulation() for _ in range(100)))
您正在等待 101 * 100ms 仅用于 IO 单独per 循环。如果向所有连接发送数据不如保持主循环足够响应重要,那么将其分成不同的任务。
以下是使用queue
进行分离任务传输的示例设置:
# mock-up code, not full code.
async def sender_queued(queue: asyncio.Queue):
receiver = get_receiver()
while data := await receiver():
await send_io_simulation(data)
await queue.put(data)
await queue.put(None)
async def send_mass(queue: asyncio.Queue):
while data := await queue.get():
await asyncio.gather((send_io_simulation(data, conn) for conn in all_connections))
def queued_driver_code():
async def driver():
queue = asyncio.Queue()
await asyncio.gather(sender_queued(queue), send_mass(queue))
asyncio.run(driver())
queue 最终会在数据流无限时堆积起来。在这种情况下,向两个任务添加压力逻辑以控制排队速度、丢弃/跳过一些数据、分离进程或只是像最初那样简单地等待,如果您不太关心主循环的响应性。
【讨论】:
以上是关于减少 Asyncio 中的延迟的主要内容,如果未能解决你的问题,请参考以下文章
使用 asyncio 将 bash 作为 Python 的子进程运行,但 bash 提示被延迟
深究Python中的asyncio库-asyncio简介与关键字