Celery的使用

Posted wang-kai-xuan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Celery的使用相关的知识,希望对你有一定的参考价值。

Celery的基本使用

#启动celery的worker
celery -A proj worker -l info -P eventlet -Q queue
#-A 指定项目
#-l 日志级别
#-P windows系统下需要加该参数,否则报错
#-Q 指定队列

#后台启动celery
celery multi start 集群名(任意) -A proj -n worker名称
#stop停止  restart重启

#发布任务,会监控配置文件中配置的所有任务策略,按照任务策略下发任务
celery -A proj beat

#查看celery指令的帮助
celery --help
#查看celery下的指令的帮助信息,如worker命令
celery worker --help
#flower的使用
celery flower --broker=redis://localhost:6379/0 --port=6555
#django-celery-beat动态监视任务变化
celery -A django_celery_demo beat --scheduler django_celery_beat.schedulers:DatabaseScheduler

多队列任务

celery可用于处理分布式任务。对于开销不同的任务可以放到性能不同的机器上去执行。基本原理就是在celery中配置多个队列,每个队列用来用来存储不同的任务。在多台机器上分别指定队列来启动worker,该worker只会去消费指定队列中的任务。

主要配置信息如下:

from kombu import Queue, Exchange

# 默认交换机
default_exchange = Exchange('default', type='direct')
# 定义一个媒体交换机,类型是直连交换机
media_exchange = Exchange('media', type='direct')
# 定义一个打印交换机
print_exchange = Exchange('print', type='direct')
# 定义一个计算交换机
calc_exchange = Exchange('calc', type='direct')

# 创建五个队列,一个是默认队列,一个是videos、一个images、一个prints、一个calcs
CELERY_QUEUES = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('videos', media_exchange, routing_key='media.video'),
    Queue('images', media_exchange, routing_key='media.image'),
    Queue('prints', print_exchange, routing_key='task.print'),
    Queue('calcs', calc_exchange, routing_key='task.calc'),
)
# 定义默认队列和默认的交换机routing_key,如果没有为任务添加路由,则放入默认队列
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'

# 设置队列路由,交换机会根据routing_key将任务推入不同的队列
CELERY_ROUTES = ({'celery_task.media_task.image_compress': {
    'queue': 'images',
    'routing_key': 'media.image'
}}, {'celery_task.media_task.video_upload': {
    'queue': 'videos',
    'routing_key': 'media.video'
}}, {'celery_task.media_task.video_compress': {
    'queue': 'videos',
    'routing_key': 'media.video'
}}, {'celery_task.print_task.hello': {
    'queue': 'prints',
    'routing_key': 'task.print'
}}, {'celery_task.calc_task.add': {
    'queue': 'calcs',
    'routing_key': 'task.calc'
}},)
#也可以在创建任务时为任务指定队列
from celery_task.main import app

#该任务会被放入print队列中
@app.task(queue='print')
def hello():
    print('hello')

指定队列启动worker

celery -A celery_task.main worker -Q print

定时任务

定时任务根据配置文件中制定的执行策略来定期执行任务

配置信息

from celery.schedules import crontab
from datetime import timedelta

# 需要定期执行任务的配置
CELERYBEAT_SCHEDULE = {
    # 定时任务一
    'opt_main.opt.aps_task1': {
        # 指定任务的路径,任务如果是一个类的方法,只需要指定类所在的文件即可,不需要指定类
        #如 文件夹.文件.方法
        'task': 'celery_task.opt_main.aps_task1',
        # 每两分钟执行一次
        'schedule': crontab(minute='*/2'),
        # 传递的参数
        'args': ('123',)
    },
    # 定时任务二
    'opt_main.opt.aps_task2': {
        # 执行print_task下的hello函数
        'task': 'celery_task.opt_main.aps_task2',
        # 每小时执行一次
        'schedule': crontab(hour='*/1'),
        'args': ('123',)
    },
    # 定时任务三
    'opt_main.opt.aps_task3': {
        # 执行print_task下的hello函数
        'task': 'celery_task.opt_main.aps_task3',
        # 每隔1秒执行一次
        'schedule': timedelta(seconds=1),
        'args': ('123',)
    },
}

# 导入任务模块,任务所在的路径
CELERY_IMPORTS = ['celery_task.print_task', 'celery_task.media_task', 'celery_task.calc_task', 'celery_task.opt_main']

celery的crontab的使用

crontab是celery提供给我们帮我们指定任务执行时间策略的类。

#crontab实例化参数主要有5个
    - minute  分钟,范围0-59
    - hour   小时,范围0-23
    - day_of_week  星期几 范围0-6,以星期天开始,0位星期天,还可以用英文缩写表示sun代表周日
    - day_of_month  每月的第几号,范围1-31
    - month_of_year  月份,范围1-12

示例

#默认参数如下,即每分钟执行一次
crontab((minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*')
        
#每小时的第10分钟执行任务
crontab(minute=10)
#也可以指定多个值,如每小时的第10和30分钟执行任务
crontab(minute='10,30')
#每天的0点0分执行任务
crontab(minute=0,hour=0)
        
#可以设置范围
#9点到12点每个小时的每分钟执行任务
crontab(minute='*', hour='9-12')
#9点到12点和20点中每分钟执行任务
crontab(hour='9-12,20')
        
#设置步长
#每两分钟执行一次任务
crontab(minute='*/2')
#每月偶数天数的0点0分时刻执行1次任务
crontab(minute=0, hour=0, day_of_month='2-31/2')

通过celery的beat指令,扫描发布项目下所有的定时任务

celery -A celery_task.main beat

异步任务

from celery_task.main import app


@app.task
def hello(s):
    print(s)

通过以下代码执行任务

from ... import hello

hello.apply_async((s,)) #调用任务

celery中遇到的一些问题

celery将任务生产到broker的消息队列中,使用kombu模块对任务进行序列化后存储,默认使用json格式进行序列化和反序列化。但在项目中,很多时候通过类的方法来创建任务。所以每个任务都要传入一个对象参数。json无法序列化python对象,所以这里改用pickle对任务进行序列化和反序列化。

配置参数如下

# celery消息的序列化方式,由于要把对象当做参数所以使用pickle
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle']
CELERY_TASK_SERIALIZER = 'pickle'

注意:pickle不能序列化一些锁对象和socket对象,所以当python对象参数包含这些对象时序列化会报错。可以更改在任务内部在创建这些对象。如redis连接对象,可改为传入连接的host等参数,在任务内部进行连接。

celery+django

官方说明:https://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django

django-celery-beat

django-celery-beat模块是一个用来动态管理定时任务的模块。该模块有6张表。可以通过django-admin来进行管理。原理是通过django的信号动态监控数据库中的数据是否发生变化,任务调度器beat会在数据发生变化后按照新的任务策略下发任务。

  • clockedschedule 任务执行时刻策略
  • crontabschedule 任务执行计划策略
  • intervalschedule 任务执行间隔策略
  • solarschedule 经纬度执行策略
  • periodtask 任务表
  • periodtasks 这张表只有一条记录,用来监视其他五张表的变化
pip install django-celery-beat   #安装模块

#在django的settings.py中注册该app
INSTALLED_APPS = [
    ...
    'django_celery_beat',
    ...
]

#执行数据库迁移命令生成数据库表
python manage.py migrate

settings.py中关于celery的一些配置

# celery配置

CELERY_BROKER_URL = 'redis://192.168.16.22:6379/5'  # Broker配置,使用Redis作为消息中间件

CELERY_TIMEZONE = 'Asia/Shanghai'  # 设置时区

CELERY_ENABLE_UTC = False  # 禁止使用UTC时间

DJANGO_CELERY_BEAT_TZ_AWARE = False  # 官方用来修复CELERY_ENABLE_UTC=False and USE_TZ = False 时时间比较错误的问题

CELERY_RESULT_BACKEND = 'redis://192.168.16.22:6379/6'  # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json'  # 结果序列化方案

# 默认交换机
default_exchange = Exchange('default', type='direct')
# 定义一个计算交换机
calc_exchange = Exchange('calc', type='direct')

# 创建两个队列,一个是默认队列,一个calcs
CELERY_QUEUES = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('calcs', calc_exchange, routing_key='task.calc'),
)
# 定义默认队列和默认的交换机routing_key,如果没有为任务添加路由,则放入默认队列
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'

# 设置队列路由,交换机会根据routing_key将任务推入不同的队列
CELERY_ROUTES = ({'celery_task.add': {
    'queue': 'calcs',
    'routing_key': 'task.calc'
}},)

官方说明:https://pypi.org/project/django-celery-beat/

以上是关于Celery的使用的主要内容,如果未能解决你的问题,请参考以下文章

在 Celery 中使用 Python 标准日志记录

django入门 celery使用

celery beat 没有发送消息(使用 django-celery-beat)

Python爬虫之使用celery加速爬虫

python 一些代码使用模式作为SQL-Alchemy的声明基础,以及对Celery分支的支持。

django+celery实现异步任务