Celery配置

Posted maxaimee

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Celery配置相关的知识,希望对你有一定的参考价值。

Celery 配置

最近项目中需要做一些定时任务的工作,之前都是用 LInux 的Crontab 但是任务多了之后 不好维护也没有什么监控的措施。所以考虑使用Celery 来解决这一问题。

1.安装

pip install celery-with-redis

注意:其实celery 支持多中broker

Name Status Monitoring Remote Control
RabbitMQ Stable Yes Yes
Redis Stable Yes Yes
Amazon SQS Stable No No
Zookeeper Experimental No No

可以看见redis 和RabbitMq支持的是最好的,其中官方推荐RabbitMq,因为redis有断电或者重启丢失数据的风险。不过在这里我因为方便使用的是redis

2 配置

安装之后开始写配置文件 celeryconfig.py

BROKER_URL = ‘redis://127.0.0.1:6379/1‘ #消息队列选用
CELERY_RESULT_BACKEND = ‘redis://127.0.0.1:6379/0‘#结果存储
CELERY_TASK_SERIALIZER = ‘msgpack‘#任务序列化方式
CELERY_RESULT_SERIALIZER = ‘json‘#结果序列化方式
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_ACCEPT_CONTENT = [‘json‘, ‘msgpack‘]#接收序列化方式,masgpack序列化的方式效率很高。
创建celery 实例
app = Celery(‘app‘, include=[‘YoutProjectName.tasks‘])

app.config_from_object(‘YoutProjectName.celeryconfig‘)  # 设置环境变量

3运行代码

创建YouProjName.tasks.py

# -*- coding: utf-8 -*-
#因为celery 不支持隐式导入
from __future__ import absolute_import, unicode_literals
import random
#引入app实例
from CronTab import celery_app
#引入日志类
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

#celery 装饰器,可以在运行的时候讲任务加到队列 这里是注册,将任务加到了注册中心,这样才能消费者才能收到
@celery_app.task()
def mul(x, y):
    total = x * (y * random.randint(3, 100))
    return total


@celery_app.task()
def xsum(numbers):
    return sum(numbers)


@celery_app.task()
def xsum3(numbers):
    return sum(numbers)

打开terminal开始运行

celery -A CronTab worker -l info 可以看到如下输出

 -------------- [email protected] v3.1.25 (Cipater)

----  ----- 

--- * ***  * -- Darwin-17.3.0-x86_64-i386-64bit

-- * -  --- 

- ** ---------- [config]
- ** ---------- .> app:         app:0x1033ad978  
- ** ---------- .> transport:   redis://127.0.0.1:6379/1
- ** ---------- .> results:     redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 8 (prefork)
  -- *** ---- 
  --- * ----- [queues]
   -------------- .> default          exchange=tasks(topic) key=task.#

Options:
-A APP, --app=APP app instance to use (e.g. module.attr_name) 指定实例app
-b BROKER, --broker=BROKER 指定broker

--loader=LOADER name of custom loader class to use.
--config=CONFIG Name of the configuration module
--workdir=WORKING_DIRECTORY

-C, --no-color
-q, --quiet
--version show program‘s version number and exit
-h, --help show this help message and exit

-A 是指定项目 worker 执行,

-l 日志级别

再打开一个terminal 开始

消费

In [1]: from apps.message import tasks

In [2]: tasks.div.delay(1,2)

Out[2]: <AsyncResult: 7b8db8fc-3119-48c5-8d95-c74a283ae8b2>

注意调用的时候要加delay 语句,这样的话,就会发送给任务队列,让任务队列来执行该语句。

这个时候我们再切回到worker进程

消费

[2018-02-14 10:00:53,963: INFO/MainProcess] Task apps.message.tasks.div[7b8db8fc-3119-48c5-8d95-c74a283ae8b2] succeeded in 0.008831199025735259s: 0.5

可以看见就这样队列执行了你的任务,这就是celery 任务队列生产消费的过程

使用不同的队列

当你有很多任务需要执行的时候,不要只使用默认的queue,这样会相互影响,并且拖慢任务执行的,导致重要的任务不能被快速的执行。

CELERY_QUEUES = (  # 定义任务队列

    Queue(‘default‘, routing_key=‘task.#‘),  # 路由键以“task.”开头的消息都进default队列

    Queue(‘web_tasks‘, routing_key=‘web.#‘),  # 路由键以“web.”开头的消息都进web_tasks队列

)

CELERY_DEFAULT_EXCHANGE = ‘tasks‘  # 默认的交换机名字为tasks

CELERY_DEFAULT_EXCHANGE_TYPE = ‘topic‘  # 默认的交换类型是topic

CELERY_DEFAULT_ROUTING_KEY = ‘task.default‘  # 默认的路由键是task.default,这个路由键符合上面的default队列

CELERY_ROUTES = {

    ‘apps.message.tasks.add‘: {  # tasks.add的消息会进入web_tasks队列

        ‘queue‘: ‘web_tasks‘,

        ‘routing_key‘: ‘web.add‘,
    },
    ‘apps.message.tasks.div‘: {  # tasks.add的消息会进入web_tasks队列

        ‘queue‘: ‘web_tasks‘,

        ‘routing_key‘: ‘web.div‘,
    }
}

然后执行命令

celery -A CronTab worker -l info -Q web_task

这样的话,只会执行文在web_task中的任务,其他任务都不在执行

启动多个workers执行不同的任务

在同一台机器上,对于优先级不同的任务最好启动不同的worker去执行,比如把实时任务和定时任务分开,把执行频率高的任务和执行频率低的任务分开,这样有利于保证高优先级的任务可以得到更多的系统资源,同时高频率的实时任务日志比较多也会影响实时任务的日志查看,分开就可以记录到不同的日志文件,方便查看。

$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2.%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3.%h

可以像这样启动不同的worker,%h可以指定hostname,详细说明可以查看官方文档
高优先级的任务可以分配更多的concurrency,但是并不是worker并法数越多越好,保证任务不堆积就好。

定时任务

在 celeryconfig.py 文件中添加如下配置

CELERYBEAT_SCHEDULE = {

    ‘add‘: {
        ‘task‘: ‘apps.message.tasks.add‘,
        ‘schedule‘: timedelta(seconds=1),
        ‘args‘: (16, 16)
    }

然后新开一个命令行启动

celery beat -A CronTab

可以看见任务开始不停的向队列发送

[2018-02-14 09:25:42,689: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:43,689: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:44,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:45,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:46,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:47,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:48,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:49,691: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)

回到worker 可以看到,开始没秒钟接收并且消费了

```[2018-02-14 11:09:06,332: INFO/MainProcess] Received task: apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6][2018-02-14 11:09:06,332: INFO/Worker-8] apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6]: Executing task id 36490b36-9f3a-4158-aa2e-a6640d4607b6, args: [16, 16] kwargs: {}

[2018-02-14 11:09:06,332: INFO/Worker-1] apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6]: Executing task id 36490b36-9f3a-4158-aa2e-a6640d4607b6, args: [16, 16] kwargs: {}

[2018-02-14 11:09:06,333: INFO/MainProcess] Task apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6] succeeded in 0.001320198003668338s: 32

[2018-02-14 11:09:06,333: INFO/MainProcess] Task apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6] succeeded in 0.0014644850161857903s: 32

```

以上是关于Celery配置的主要内容,如果未能解决你的问题,请参考以下文章

python 关于celery的异步任务队列的基本使用(celery+redis)采用配置文件设置

django+celery配置(定时任务)

celery配置

celery application

Celery增加Systemd配置

如何使用 Django 配置 Celery 守护进程