FastAPI如何将ZMQ添加到事件循环

Posted

技术标签:

【中文标题】FastAPI如何将ZMQ添加到事件循环【英文标题】:FastAPI how to add ZMQ to eventloop 【发布时间】:2020-09-06 19:11:39 【问题描述】:

我很惊讶这并没有真正被详细询问,但由于某种原因,我无法在任何地方找到这个问题或解决方案。似乎很多人都遇到了一个问题,即您有一个 fastAPI 应用程序,该应用程序还需要与其他一些微服务进行通信(以比 http 消息更有效的方式)。我已经阅读了所有关于集成到 asyncio 中的 zmq 文档,但到目前为止,我还没有找到任何关于如何使用 fastapi(甚至是 starlette)将 zmq 添加到事件循环中的任何信息。这是zmq网站的代码示例:

import asyncio
import zmq
from zmq.asyncio import Context

ctx = Context.instance()

async def recv():
    s = ctx.socket(zmq.SUB)
    s.connect('tcp://127.0.0.1:5555')
    s.subscribe(b'')
    while True:
        msg = await s.recv_multipart()
        print('received', msg)
    s.close() 

这向我们展示了一个异步函数,这很棒,但再次需要在事件循环中与 fastAPI 协程一起运行。这应该怎么做?除了后台任务之外,fastAPI 文档并没有真正为我们提供任何接口来运行单独的协程。我不确定后台任务中是否发生了任何其他魔法,但对于需要与另一个微服务通信的东西,我希望它具有类似于 fastAPI 协程的调度。此外,您无法在启动时启动后台任务,因此您必须进行一些欺骗性的调用才能使其运行(这很hacky ..但从技术上讲是可行的)。此外,如果我们可以使用类似的东西注册一个处理程序会更好

@app.set("zmq_recv)
async def recv():
    s = ctx.socket(zmq.SUB)
    s.connect('tcp://127.0.0.1:5555')
    s.subscribe(b'')....

这将基于某处的配置,允许 zmq 上下文中的所有消息自动转到此函数。这可能允许我们在 fastAPI 协程中运行 zmq,只需绑定另一个端口,并确保来自该端口的所有流量都流向这个特殊的 app.set 方法。像这样的东西我会很好的......

ctx = Context.instance()

@app.on_event("startup")
async def startup_event():
    s = ctx.socket(zmq.PULL)
    s.bind('tcp://127.0.0.1:5555').setHandler("zmq_recv") # this setHandler is something magic that tells fastAPI to have all traffic on port 5555 to go to this handler function

@app.on_special_handler("zmq_recv")
async def zmq_recv(socket):
    msg = await socket.recv_multipart()
    print('received', msg)
    

理想情况下,这是我想要的,使用 fastAPI 协程......但是让 zmq 套接字调用对特定函数的响应。是否可以创建协程示例或这种模板化示例?如果不是,人们如何声称您可以有效地将 zmq 与 fastapi 一起使用? (我猜后台任务可以解决问题.. 但它看起来真的很作弊)

【问题讨论】:

我不太确定您的问题想要什么。你想在你的 asgi application worker 中接收来自 zmq 的消息吗? 我想在事件循环中接收消息,这不会阻塞 fastapi 协程。或者在 fastapi 协程本身内部。 【参考方案1】:

我实际上正在寻求做类似的事情,专门用于通过非 http 处理程序侦听来自消息队列的事件。 我当前的解决方案是连接到事件循环并在那里添加侦听器。 它可能看起来像这样:(取自 fastapi 中的这个问题 -> issue)

loop = asyncio.get_event_loop()

loop.create_task(serve(app, config))  # run fastapi
loop.create_task(your_tcp_app()) # run your app
loop.run_forever()  # start event loop

您还可以在初始化后调用应用程序正在使用的事件循环,这与您最初所做的更相似:

@app.on_event("startup")
async def startup_event():
    loop = asyncio.get_event_loop() # should return the loop fastapi is already using
    loop.create_task(your_tcp_app()) # run your app

我正在考虑这个解决方案,因为我害怕 python 中的线程管理,因为这将存在于生产服务中,我可能最终将两个进程完全分开以使事情变得更简单...... 我想知道这是否对您有用,或者其他人是否对这种实现有任何想法:)

【讨论】:

以上是关于FastAPI如何将ZMQ添加到事件循环的主要内容,如果未能解决你的问题,请参考以下文章

如何在 python 应用程序中使用两个事件循环

js如何循环添加点击事件

在循环中单击另一个元素时将事件侦听器添加到元素

如何将应用程序定义的消息发送到 SDL 中的事件循环?

如何将 lambda 函数排队到 Qt 的事件循环中?

事件循环和任务队列