Asyncio:带有 aio-pika 的 Fastapi,消费者忽略 Await

Posted

技术标签:

【中文标题】Asyncio:带有 aio-pika 的 Fastapi,消费者忽略 Await【英文标题】:Asyncio: Fastapi with aio-pika, consumer ignores Await 【发布时间】:2021-01-28 15:27:00 【问题描述】:

我正在尝试将我的 websocket 端点与 rabbitmq (aio-pika) 挂钩。目标是让该端点中的侦听器和来自队列的任何新消息通过 websockets 将消息传递给浏览器客户端。

我在带有 asyncio 循环的脚本中使用 asyncio 测试了消费者。按照我的方式工作并使用 aio-pika 文档。 (来源:https://aio-pika.readthedocs.io/en/latest/rabbitmq-tutorial/2-work-queues.htmlworker.py

但是,当我在 websockets 端点的 fastapi 中使用它时,我无法使其工作。不知何故,听众:

await queue.consume(on_message)

被完全忽略。

这是我的尝试(我把它全部放在一个函数中,所以它更具可读性):

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    print("Entering websockets")
    await manager.connect(websocket)
    print("got connection")

    # params
    queue_name = "task_events"
    routing_key = "user_id.task"


    con = "amqp://rabbitmq:rabbitmq@rabbit:5672/"
    connection = await connect(con)
    channel = await connection.channel()


    await channel.set_qos(prefetch_count=1)

    exchange = await channel.declare_exchange(
        "topic_logs",
        ExchangeType.TOPIC,
    )

    # Declaring queue
    queue = await channel.declare_queue(queue_name)

    # Binding the queue to the exchange
    await queue.bind(exchange, routing_key)

    async def on_message(message: IncomingMessage):
        async with message.process():
            # here will be the message passed over websockets to browser client
            print("sent", message.body)

    

    
    try:
        
        ######### Not working as expected ###########
        # await does not await and websockets finishes, as there is no loop
        await queue.consume(on_message) 
        #############################################

        ################ This Alternative code atleast receives some messages #############
        # If I use this part, I atleast get some messages, when I trigger a backend task that publishes new messages to the queue. 
        # It seems like the messages are somehow stuck and new task releases all stucked messages, but does not release new one. 
        while True: 
            await queue.consume(on_message)
            await asyncio.sleep(1)
        ################## one part #############

    except WebSocketDisconnect:
        manager.disconnect(websocket)

我对 python 中的异步非常陌生。我不确定问题出在哪里,在从 aio-pika 获得 worker.py 的启发时,我无法以某种方式实现异步消耗循环。

【问题讨论】:

【参考方案1】:

解决办法很简单。

aio-pika queue.consume 即使我们使用 await 也是非阻塞的,所以 这样我们消费

consumer_tag = await queue.consume(on_message, no_ack=True)

在连接结束时我们取消

await queue.cancel(consumer_tag)

对我来说,解决方案的核心是让异步阻塞,所以我使用 消费

之后的这部分代码
while True:
    data = await websocket.receive_text()
    x = await manager.send_message(data, websocket)

我不使用此代码,但它很有用,因为这部分代码等待前端 websocket 响应。如果这部分代码丢失,那么客户端连接只是为了断开连接(成功执行 websocket 端点),因为没有任何阻塞

【讨论】:

【参考方案2】:

您可以使用异步迭代器,这是从队列中消费消息的第二种规范方式。

在你的情况下,这意味着:

async with queue.iterator() as iter:
  async for message in iter:
    async with message.process():
      # do something with message

只要没有收到消息就会阻塞,处理完消息后会再次挂起。

【讨论】:

以上是关于Asyncio:带有 aio-pika 的 Fastapi,消费者忽略 Await的主要内容,如果未能解决你的问题,请参考以下文章

等效于带有工作“线程”的 asyncio.Queues

带有 difflib 的 Python Asyncio 缓慢爬行

简单的 Python 多线程网络服务器,带有 Asyncio 和在主函数中调用的事件

在超时中包装 asyncio.gather

PIL 和使用 asyncio 阻塞调用

Python TypeError:“_asyncio.Future”对象不可下标