Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控

Posted 刘润森!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控相关的知识,希望对你有一定的参考价值。

@Author:Runsen

消息队列

消息队列让应用程序在用户请求之外异步执行称为任务的工作。如果应用程序需要在后台执行工作,它会将任务添加到任务队列中。这些任务稍后由工作服务执行。

Celery

Celery 是一个分布式任务队列,可帮助在后台异步执行大量进程/消息并进行实时处理。芹菜由三个主要成分组成:

  • Celery 客户端
  • 消息代理
  • Celery 工人

下图显示了组件之间如何交互的简化图。我们将使用 FastAPI 作为我们的 Celery 客户端和 RabbitMQ 作为消息代理。

  • 将Celery 客户将运行FastAPI应用程序,并会发出消息/后台作业的RabbitMQ。

  • RabbitMQ 将作为消息代理来调解客户端和工作线程之间的消息。

  • RabbitMQ 收到客户端的消息后,会通过将消息发送给 celery worker 来启动客户端任务。

  • 一个celery 工人被认为是将实现在任何Web服务器的请求的异步后台任务。可以有多个工人一次执行/完成许多任务。

  • celery 将确保每个 worker 一次只执行一个任务,并且每个任务只由一个 worker 分配。

FastAPI 是一个现代、快速(高性能)的 Web 框架,

安装 fastapi包

pip install fastapi

安装 celery 包

pip install celery

我们还需要安装 ASGI 服务器来运行我们的 FastAPI 应用程序。

pip install uvicorn

在我们的本地机器上运行 RabbitMQ 的最简单方法之一是使用 Docker。

Docker安装查看:https://docs.docker.com/get-docker/

运行以下命令即可通过终端中的 docker 启动 RabbitMQ 映像。

docker run -d --name some-rabbit -p 4369:4369 -p 5671:5671 -p 5672:5672 -p 15672:15672 rabbitmq:3

如果没有安装RabbitMQ 映像,会直接下载安装。

创建 Celery Worker 任务

现在,一旦消息代理RabbitMQ 运行,就可以创建Worker 程序。

这是因为 celery Worker 会侦听消息代理以执行排队的任务。

在这一部分,我们将为 celery worker 创建任务。创建一个文件celery_worker.py:

from time import sleep
from celery import Celery
from celery.utils.log import get_task_logger
#初始Celery
celery = Celery('tasks', broker='amqp://guest:guest@127.0.0.1:5672//')
#创建记录器-启用以在任务记录器上显示消息
celery_log = get_task_logger(__name__)
#创建订单-与芹菜异步运行
#长时间运行任务的示例流程
@celery.task
def create_order(name, quantity):
    # 每1个订单5秒
    complete_time_per_item = 5
    # 根据订购的物品数量不断增加
    sleep(complete_time_per_item * quantity)
    
    celery_log.info(f"Order Complete!")
    return {"message": f"Hi {name}, Your order has completed!",
            "order_quantity": quantity}

新建一个model.py

from pydantic import BaseModel
#请求主体的订单类模型
class Order(BaseModel):
    customer_name: str
    order_quantity: int

然后,创建一个名为main.py的新文件:

from fastapi import FastAPI
from celery_worker import create_order
from model import Order
# 创建FastAPI应用程序
app = FastAPI()
# 创建订单
@app.post('/order')
def add_order(order: Order):
    # 使用delay=() 方法调用芹菜任务
    create_order.delay(order.customer_name, order.order_quantity)
    return {"message": "Order Received! Thank you for your patience."}

运行 FastAPI 应用程序:

uvicorn main:app --reload

访问http://localhost:8000/docs

以查看 Swagger 文档中正在运行的 FastAPI。

celery -A celery_worker.celery worker --loglevel=info

可以通过插入请求正文输入来测试我们的端点。这是请求正文的示例输入:

单击“Execute”以从端点获取响应,将看到以下结果:

有没有什么有效的方法来监控后台任务?

我们可以使用Flower 监控工具来监控我们所有的Celery 工人。

pip install flower

然后,flower在我们的本地机器上运行服务器:

celery flower -A celery_worker.celery --broker:amqp://localhost//

访问http://localhost:5555/。

Flower 服务器并单击导航栏上的“Tasks”选项卡来监控我们的 celery 工人。

以上是关于Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控的主要内容,如果未能解决你的问题,请参考以下文章

Docker,Celery,组合方法失败

Celery+rabbitmq+mysql+flower

flower 时区设置

如何向 Django Celery Flower Monitoring 添加身份验证和端点?

flower 指定app

我可以将 Celery Flower 配置为在关闭 Unix shell 后运行吗?