Celery+Rabbitmq实现异步任务

Posted 小斌哥ge

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Celery+Rabbitmq实现异步任务相关的知识,希望对你有一定的参考价值。


Celery+Rabbitmq实现异步任务

一. 安装celery,再安装rabbitmq或redis

pip install celery

rabbitmq和redis安装其中一个就可以了,celery官方文档里说用两个都可以,但优先推荐rabbitmq,具体怎么安装可以自己找一下教程.

二. 搭建celery任务架构

  1. 在项目中适合的位置创建一个celery_tasks目录,在这个目录下写celery的代码,将celery代码与项目
    业务逻辑代码独立开.当然也可以不分开,具体根据项目的代码量和实际需要来使用.
    注意:目录名不要直接叫celery,不要与python关键字,第三方模块的名字冲突,否则导致导包出错
  2. ​在新建的目录下创建config.py​​​,​​tasks.py​​,main.py三个python文件分别用于编写celery的配置代码,任务函数代码和任务启动代码
# 目录结构
- celery_tasks
- config.py
- main.py
- tasks.py

三. 编写代码实现异步调用任务

  1. ​config.py​
from celery import Celery


# 创建celery对象app,demo是对celery对象的命名,自定义,见名知义即可
# broker指定后端代理,可以使用mq或redis,主要起到任务队列的作用
app = Celery(demo, broker=amqp://guest@localhost:5672//)
# app = Celery(demo, broker=redis://127.0.0.1:6379/15)
  1. ​tasks.py​
from config import app


# 定义任务,使用celery对象.task装饰任务,celery即可自动识别任务
@app.task(name=celery_task1_name)
def celery_task1_name(arg):
print(编写需要执行的任务代码, arg)


@app.task(name=celery_task2_name)
def celery_task2_name():
print(将需要执行的代码导入tasks.py文件,然后在这里调用即可)
  1. ​main.py​
from tasks import *


# 设置celery对象自动识别任务,celery_tasks指定tasks.py的目录,保证程序能找到tasks.py
app.autodiscover_tasks([celery_tasks])

四. 启动celery任务
找到main.py所在目录下,执行如下命令,如果不在此目录,则main前要写相对路径,如:celery_tasks.main

celery -A main worker -l info

-A 指定celery的启动入口main,worker为celery执行任务的后端工人,-l指定日志级别为info
执行成功后,celery就会启动worker,从代理队列中获取任务并执行,如果任务队列为空,则一直等待到有任务

Windows Bug:如果Celery4.0以上的版本在Windows上使用,通过上面的启动命令启动,在执行task.delay()时会报错:ValueError: not enough values to unpack (expected 3, got 0)
Linux不会出现此问题,Windows才有,与“绿色线程”有关,具体阅读eventlet相关资料
解决办法:

  1. 安装eventlet
pip install eventlet
  1. 启动worker时增加-P eventlet参数
celery -A main worker -l info -P eventlet

五. 调用celery异步执行任务
在需要执行异步任务的地方导入任务,使用task.delay(参数)调用任务
如:与celery_tasks目录同级的demo目录下有一个demo.py文件,我在demo.py中异步执行任务

from celery_tasks.main import celery_task1_name, celery_task2_name


def demo_func(a):
# 调用格式:任务名.delay(参数)
celery_task1_name.delay(a)
print(celery_task1_name执行完成:!.format(a))
celery_task2_name.delay()
print(celery_task1_name执行完成!)


demo_func(hello celery!)

Celery+Rabbitmq实现异步任务_Celery+Rabbitmq


以上是关于Celery+Rabbitmq实现异步任务的主要内容,如果未能解决你的问题,请参考以下文章

Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控

Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控

python测试开发django-159.Celery 异步与 RabbitMQ 环境搭建

Celery 基本使用

Django配置Celery执行异步和同步任务(tasks))

django+celery+rabbitmq实践