减少 Asyncio 中的延迟

Posted

技术标签:

【中文标题】减少 Asyncio 中的延迟【英文标题】:Reduce latency in Asyncio 【发布时间】:2020-10-01 13:27:14 【问题描述】:

我有一个使用 FastAPI 的 websocket 服务器启动并运行良好。

但是,当我使用那些“等待”时,我遇到了一些延迟问题。起初,我认为这与互联网连接或我的 linux 服务器有关。但似乎 asyncio 正在等待其他任务。

这是我的代码:

import asyncio
from pydantic import BaseModel

class UserClientWebSocket(BaseModel):
    id: str
    ws: WebSocket

    class Config:
        arbitrary_types_allowed = True



class ConnectionManager:
    def __init__(self):

        self.active_user_client_connections: List[UserClientWebSocket] = []
        self.collect_user_IDs = []




async def connect_the_user_client(self, websocket: WebSocket, THE_USER_ID):
    await websocket.accept()
    await self.send_message_to_absolutely_everybody(f"User: THE_USER_ID connected to server!")
    print("User:  ".format(THE_USER_ID) + " Connected")
    if THE_USER_ID not in self.collect_user_IDs:
        self.collect_user_IDs.append(THE_USER_ID)
    else:
        await self.send_message_to_absolutely_everybody(f"Somebody connected with the same ID as client: THE_USER_ID")
        await self.send_message_to_absolutely_everybody("but Vlori is a nice and kind guy, so he wil not get kicked :)")
        self.collect_user_IDs.append(THE_USER_ID)
    self.active_user_client_connections.append(UserClientWebSocket(ws=websocket, id=THE_USER_ID))
    await self.show_number_of_clients()


async def function_one_send_message_to_absolutely_everybody(self, message: str):
    try:
        await asyncio.sleep(2)
        await asyncio.gather(*(conn.ws.send_text(message) for conn in self.active_webpage_client_connections))
        await asyncio.gather(*(conn.ws.send_text(message) for conn in self.active_user_client_connections))
    except:
        print("waiting")

async def function_two_send_personal_message_to_user(self, message: str, websocket: WebSocket):
    try:
        await websocket.send_text(message)
    except:
        print("waiting for task..")

... ... ... ... ...

再往下是客户端连接的通道:

@app.websocket("/ws/testchannel")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    try:
        while True:
            data = await websocket.receive_text()
            print_result = print("Received data:  ".format(data))

            send_data_back_to_user = await websocket.send_text(f"you sent message: data")

          

    except WebSocketDisconnect as e:
        print("client left chat, error = ", e)

现在的代码可以完美运行,性能也不错!但是,如果我在“send_data_back_to_user”行下添加一个异步 def 函数,例如:

await connection_manager.function_one_send_message_to_absolutely_everybody(data)

然后有一个巨大的延迟!这是为什么呢?

我正在玩耍并尝试了这个:

@app.websocket("/ws/testchannel")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            print_result = print("Received data:  ".format(data))

            send_data_back_to_user = await websocket.send_text(f"you sent message: data")   # if i comment this line and the line underneath, and the speed is extremely fast!

   
            the_asyncio_loop = asyncio.get_event_loop()

            print_data_on_terminal = asyncio.gather(print_result)
            return_data_back_to_user = asyncio.gather(send_data_back_to_user)
            broadcast_msg = asyncio.gather(connection_manager.function_one_send_message_to_absolutely_everybody(data))

            run_all_loops_together = asyncio.gather(print_data_on_terminal, return_data_back_to_user, broadcast_msg)

            results = the_asyncio_loop.run_until_complete(run_all_loops_together)

            print(results)


    except WebSocketDisconnect as e:
        print("client left chat, error = ", e)

但给了我错误:

TypeError: An asyncio.Future, a coroutine or an awaitable is required

有人可以帮我解决这个问题吗?

谢谢。

【问题讨论】:

await 让其他任务有时间在同一个线程上运行。如果您在 await 发生后注意到很多延迟,则您可能试图在 asyncio 正在使用的线程上做太多工作。 但是有解决办法吗?诸如 async.gather 之类的东西?如果不是,那么 asyncio 的意义何在? 我想我可以,但你的代码可能需要大量重构才能使用asyncio.Queue。目前正在努力 - 拿着我的啤酒。 Asyncio 用于在单个线程上一次等待多个 IO 资源。 在这个阶段,低延迟对我的 websocket 和项目来说非常重要,如果绝对没有可能(我认为是这样),减少这个 asyncio.gather() 或 asyncio.future (),那么我可能需要考虑使用其他东西.. 【参考方案1】:

如果我误解了你的任何解释,请告诉我。

据我了解,您在等待关注时在 websocket_endpoint 上遇到了一些延迟。

async def function_one_send_message_to_absolutely_everybody(self, message: str):
    try:
        await asyncio.sleep(2)
        await asyncio.gather(*(conn.ws.send_text(message) for conn in self.active_webpage_client_connections))
        await asyncio.gather(*(conn.ws.send_text(message) for conn in self.active_user_client_connections))
    except:
        print("waiting")

您正在等待asycnio.sleep。这实际上会使您的 websocket_endpoint 从开始延迟 2 秒。

await 表示阻止当前执行直到awaitable 完成,并且安排 执行该awaitable - 在那之前,将事件循环 控制权交还给main 并允许执行其他awaitableAsyncio不平行,python 不要只是跳过 await something 并移动到下一行 - 它阻塞直到它完成。

要解决此问题,请先删除 await asyncio.sleep(2)。但还有一个可能的问题:asyncio.gather

IO 操作很慢,您正在等待消息发送到所有现有连接。这也增加了websocket_endpoint 中主循环的延迟。

抽象地说,没有那个asyncio.sleep(2),你的代码就是这样做的:

# assume there is 101 connections including main

async def io_simulation():
    await asyncio.sleep(0.1)  # lets say net delay is 100ms
    return 'Message'

async def sender():
    while data := await io_simulation():  # receive data

        # send message
        await io_simulation()

        # send to 100 users
        await asyncio.gather(*(io_simulation() for _ in range(100)))

您正在等待 101 * 100ms 仅用于 IO 单独per 循环。如果向所有连接发送数据不如保持主循环足够响应重要,那么将其分成不同的任务。

以下是使用queue 进行分离任务传输的示例设置:

# mock-up code, not full code.

async def sender_queued(queue: asyncio.Queue):
    receiver = get_receiver()
    
    while data := await receiver():
        await send_io_simulation(data)
        await queue.put(data)

    await queue.put(None)


async def send_mass(queue: asyncio.Queue):
    while data := await queue.get():
        await asyncio.gather((send_io_simulation(data, conn) for conn in all_connections))


def queued_driver_code():
    async def driver():
        queue = asyncio.Queue()
        await asyncio.gather(sender_queued(queue), send_mass(queue))
    asyncio.run(driver())

queue 最终会在数据流无限时堆积起来。在这种情况下,向两个任务添加压力逻辑以控制排队速度、丢弃/跳过一些数据、分离进程或只是像最初那样简单地等待,如果您不太关心主循环的响应性。

【讨论】:

以上是关于减少 Asyncio 中的延迟的主要内容,如果未能解决你的问题,请参考以下文章

使用 asyncio 将 bash 作为 Python 的子进程运行,但 bash 提示被延迟

python异步编程模块asyncio学习

深究Python中的asyncio库-asyncio简介与关键字

Asyncio中的Task管理

Python中的Asyncio和Thread模块有啥区别[重复]

asyncio