消息队列:celery
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列:celery相关的知识,希望对你有一定的参考价值。
参考技术ACelery的架构由三部分组成, 消息中间件(message broker) , 任务执行单元(worker) 和 任务执行结果存储(task result store) 组成。
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ,Redis,MongoDB(experimental),Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括 AMQP,redis,memcached,mongodb,SQLAlchemy, Django ORM,Apache Cassandra,IronCache 等。
这里我先不去看它是如何存储的,就先选用redis来存储任务执行结果。
因为涉及到消息中间件(在Celery帮助文档中称呼为中间人<broker>),为了更好的去理解文档中的例子,可以安装两个中间件,一个是RabbitMQ,一个redis。
根据 Celery的帮助文档 安装和设置RabbitMQ, 要使用 Celery,需要创建一个 RabbitMQ 用户、一个虚拟主机,并且允许这个用户访问这个虚拟主机。
异步任务:将耗时操作任务提交给 Celery 去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
定时任务:定时执行某件事情,比如每天数据统计
python celery 多work多队列
1.Celery模块调用
既然celery是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢,celery可以支持多台不通的计算机执行不同的任务或者相同的任务。
如果要说celery的分布式应用的话,就要提到celery的消息路由机制,AMQP协议。具体的可以查看AMQP的文档。简单地说就是可以有多个消息队列(Message Queue),不同的消息可以指定发送给不同的Message Queue,而这是通过Exchange来实现的。发送消息到Message Queue中时,可以指定routiing_key,Exchange通过routing_key来把消息路由(routes)到不同的Message Queue中去。
多worker,多队列,实例:
1.在服务器上编写文件tasks.py。首先定义一个Celery的对象,然后通过celeryconfig.py对celery对象进行设置。之后又分别定义了三个task,分别是taskA, taskB和add。
#!/usr/bin/env
#-*-conding:utf-8-*-
from celery import Celery,platforms
platforms.C_FORCE_ROOT = True
app = Celery()
app.config_from_object("celeryconfig")
@app.task
def tashA(x,y):
return x*y
@app.task
def taskB(x,y,z):
return x+y+z
@app.task
def add(x,y):
return x+y
2.编写celeryconfig.py文件。
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from kombu import Exchange,Queue
from celery import platforms
platforms.C_FORCE_ROOT = True
BROKER_URL = "redis://localhost:6379/7"
CELERY_RESULT_BACKEND = "redis://localhost:6379/8"
CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
)
CELERY_ROUTES = {
‘tasks.taskA‘:{"queue":"for_task_A","routing_key":"for_task_A"},
‘tasks.taskB‘:{"queue":"for_task_B","routing_key":"for_task_B"}
}
3.启动worker来指定task
celery -A tasks worker -l info -n workerA.%h -Q for_task_A
celery -A tasks worker -l info -n workerB.%h -Q for_task_B
4.传入参数
将上面两个文件导出到pycharm中:
编写文件传参:
from tasks import * re1 = taskA.delay(100, 200) re2 = taskB.delay(1,2, 3)
print(re3.status) #查看re3的状态 print(re3.id) #查看re3的id
运行之后可见:taskA,taskB都已正常执行。
5.我们可以看到add(re3)的状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。下面,我们来启动一个worker来执行celery队列中的任务。
celery -A tasks worker -l info -n worker.%h -Q celery
这样我们再次运行pycharm就可以看见add也被运行了,并且redis数据库中也有该id了。
2.Celery与定时任务
1.在celery中执行定时任务非常简单,只需要设置celery对象中的CELERYBEAT_SCHEDULE属性即可。
下面我们接着在celeryconfig.py中添加CELERYBEAT_SCHEDULE变量:
CELERY_TIMEZONE = ‘UTC‘
CELERYBEAT_SCHEDULE = {
‘taskA_schedule‘ : {
‘task‘:‘tasks.taskA‘,
‘schedule‘:20,
‘args‘:(5,6)
},
‘taskB_scheduler‘ : {
‘task‘:"tasks.taskB",
"schedule":200,
"args":(10,20,30)
},
‘add_schedule‘: {
"task":"tasks.add",
"schedule":10,
"args":(1,2)
}
}
2.Celery启动定时任务
celery -A tasks worker -l info -n workerA.%h -Q for_task_A -B
启动完成后:
taskA每20秒执行一次taskA.delay(5, 6)
taskB每200秒执行一次taskB.delay(10, 20, 30)
Celery每10秒执行一次add.delay(1, 2)
以上是关于消息队列:celery的主要内容,如果未能解决你的问题,请参考以下文章
消息队列(kafka/nsq 等)与任务队列(celery)到底有啥不同
Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控