Celery的使用
Posted wang-kai-xuan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Celery的使用相关的知识,希望对你有一定的参考价值。
Celery的基本使用
#启动celery的worker
celery -A proj worker -l info -P eventlet -Q queue
#-A 指定项目
#-l 日志级别
#-P windows系统下需要加该参数,否则报错
#-Q 指定队列
#后台启动celery
celery multi start 集群名(任意) -A proj -n worker名称
#stop停止 restart重启
#发布任务,会监控配置文件中配置的所有任务策略,按照任务策略下发任务
celery -A proj beat
#查看celery指令的帮助
celery --help
#查看celery下的指令的帮助信息,如worker命令
celery worker --help
#flower的使用
celery flower --broker=redis://localhost:6379/0 --port=6555
#django-celery-beat动态监视任务变化
celery -A django_celery_demo beat --scheduler django_celery_beat.schedulers:DatabaseScheduler
多队列任务
celery可用于处理分布式任务。对于开销不同的任务可以放到性能不同的机器上去执行。基本原理就是在celery中配置多个队列,每个队列用来用来存储不同的任务。在多台机器上分别指定队列来启动worker,该worker只会去消费指定队列中的任务。
主要配置信息如下:
from kombu import Queue, Exchange
# 默认交换机
default_exchange = Exchange('default', type='direct')
# 定义一个媒体交换机,类型是直连交换机
media_exchange = Exchange('media', type='direct')
# 定义一个打印交换机
print_exchange = Exchange('print', type='direct')
# 定义一个计算交换机
calc_exchange = Exchange('calc', type='direct')
# 创建五个队列,一个是默认队列,一个是videos、一个images、一个prints、一个calcs
CELERY_QUEUES = (
Queue('default', default_exchange, routing_key='default'),
Queue('videos', media_exchange, routing_key='media.video'),
Queue('images', media_exchange, routing_key='media.image'),
Queue('prints', print_exchange, routing_key='task.print'),
Queue('calcs', calc_exchange, routing_key='task.calc'),
)
# 定义默认队列和默认的交换机routing_key,如果没有为任务添加路由,则放入默认队列
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'
# 设置队列路由,交换机会根据routing_key将任务推入不同的队列
CELERY_ROUTES = ({'celery_task.media_task.image_compress': {
'queue': 'images',
'routing_key': 'media.image'
}}, {'celery_task.media_task.video_upload': {
'queue': 'videos',
'routing_key': 'media.video'
}}, {'celery_task.media_task.video_compress': {
'queue': 'videos',
'routing_key': 'media.video'
}}, {'celery_task.print_task.hello': {
'queue': 'prints',
'routing_key': 'task.print'
}}, {'celery_task.calc_task.add': {
'queue': 'calcs',
'routing_key': 'task.calc'
}},)
#也可以在创建任务时为任务指定队列
from celery_task.main import app
#该任务会被放入print队列中
@app.task(queue='print')
def hello():
print('hello')
指定队列启动worker
celery -A celery_task.main worker -Q print
定时任务
定时任务根据配置文件中制定的执行策略来定期执行任务
配置信息
from celery.schedules import crontab
from datetime import timedelta
# 需要定期执行任务的配置
CELERYBEAT_SCHEDULE = {
# 定时任务一
'opt_main.opt.aps_task1': {
# 指定任务的路径,任务如果是一个类的方法,只需要指定类所在的文件即可,不需要指定类
#如 文件夹.文件.方法
'task': 'celery_task.opt_main.aps_task1',
# 每两分钟执行一次
'schedule': crontab(minute='*/2'),
# 传递的参数
'args': ('123',)
},
# 定时任务二
'opt_main.opt.aps_task2': {
# 执行print_task下的hello函数
'task': 'celery_task.opt_main.aps_task2',
# 每小时执行一次
'schedule': crontab(hour='*/1'),
'args': ('123',)
},
# 定时任务三
'opt_main.opt.aps_task3': {
# 执行print_task下的hello函数
'task': 'celery_task.opt_main.aps_task3',
# 每隔1秒执行一次
'schedule': timedelta(seconds=1),
'args': ('123',)
},
}
# 导入任务模块,任务所在的路径
CELERY_IMPORTS = ['celery_task.print_task', 'celery_task.media_task', 'celery_task.calc_task', 'celery_task.opt_main']
celery的crontab的使用
crontab是celery提供给我们帮我们指定任务执行时间策略的类。
#crontab实例化参数主要有5个
- minute 分钟,范围0-59
- hour 小时,范围0-23
- day_of_week 星期几 范围0-6,以星期天开始,0位星期天,还可以用英文缩写表示sun代表周日
- day_of_month 每月的第几号,范围1-31
- month_of_year 月份,范围1-12
示例
#默认参数如下,即每分钟执行一次
crontab((minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*')
#每小时的第10分钟执行任务
crontab(minute=10)
#也可以指定多个值,如每小时的第10和30分钟执行任务
crontab(minute='10,30')
#每天的0点0分执行任务
crontab(minute=0,hour=0)
#可以设置范围
#9点到12点每个小时的每分钟执行任务
crontab(minute='*', hour='9-12')
#9点到12点和20点中每分钟执行任务
crontab(hour='9-12,20')
#设置步长
#每两分钟执行一次任务
crontab(minute='*/2')
#每月偶数天数的0点0分时刻执行1次任务
crontab(minute=0, hour=0, day_of_month='2-31/2')
通过celery的beat指令,扫描发布项目下所有的定时任务
celery -A celery_task.main beat
异步任务
from celery_task.main import app
@app.task
def hello(s):
print(s)
通过以下代码执行任务
from ... import hello
hello.apply_async((s,)) #调用任务
celery中遇到的一些问题
celery将任务生产到broker的消息队列中,使用kombu模块对任务进行序列化后存储,默认使用json格式进行序列化和反序列化。但在项目中,很多时候通过类的方法来创建任务。所以每个任务都要传入一个对象参数。json无法序列化python对象,所以这里改用pickle对任务进行序列化和反序列化。
配置参数如下
# celery消息的序列化方式,由于要把对象当做参数所以使用pickle
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle']
CELERY_TASK_SERIALIZER = 'pickle'
注意:pickle不能序列化一些锁对象和socket对象,所以当python对象参数包含这些对象时序列化会报错。可以更改在任务内部在创建这些对象。如redis连接对象,可改为传入连接的host等参数,在任务内部进行连接。
celery+django
django-celery-beat
django-celery-beat模块是一个用来动态管理定时任务的模块。该模块有6张表。可以通过django-admin来进行管理。原理是通过django的信号动态监控数据库中的数据是否发生变化,任务调度器beat会在数据发生变化后按照新的任务策略下发任务。
- clockedschedule 任务执行时刻策略
- crontabschedule 任务执行计划策略
- intervalschedule 任务执行间隔策略
- solarschedule 经纬度执行策略
- periodtask 任务表
- periodtasks 这张表只有一条记录,用来监视其他五张表的变化
pip install django-celery-beat #安装模块
#在django的settings.py中注册该app
INSTALLED_APPS = [
...
'django_celery_beat',
...
]
#执行数据库迁移命令生成数据库表
python manage.py migrate
settings.py中关于celery的一些配置
# celery配置
CELERY_BROKER_URL = 'redis://192.168.16.22:6379/5' # Broker配置,使用Redis作为消息中间件
CELERY_TIMEZONE = 'Asia/Shanghai' # 设置时区
CELERY_ENABLE_UTC = False # 禁止使用UTC时间
DJANGO_CELERY_BEAT_TZ_AWARE = False # 官方用来修复CELERY_ENABLE_UTC=False and USE_TZ = False 时时间比较错误的问题
CELERY_RESULT_BACKEND = 'redis://192.168.16.22:6379/6' # BACKEND配置,这里使用redis
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
# 默认交换机
default_exchange = Exchange('default', type='direct')
# 定义一个计算交换机
calc_exchange = Exchange('calc', type='direct')
# 创建两个队列,一个是默认队列,一个calcs
CELERY_QUEUES = (
Queue('default', default_exchange, routing_key='default'),
Queue('calcs', calc_exchange, routing_key='task.calc'),
)
# 定义默认队列和默认的交换机routing_key,如果没有为任务添加路由,则放入默认队列
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'
# 设置队列路由,交换机会根据routing_key将任务推入不同的队列
CELERY_ROUTES = ({'celery_task.add': {
'queue': 'calcs',
'routing_key': 'task.calc'
}},)
官方说明:https://pypi.org/project/django-celery-beat/
以上是关于Celery的使用的主要内容,如果未能解决你的问题,请参考以下文章
celery beat 没有发送消息(使用 django-celery-beat)