Celery 配置
最近项目中需要做一些定时任务的工作,之前都是用 LInux 的Crontab 但是任务多了之后 不好维护也没有什么监控的措施。所以考虑使用Celery 来解决这一问题。
1.安装
pip install celery-with-redis
注意:其实celery 支持多中broker
Name | Status | Monitoring | Remote Control |
---|---|---|---|
RabbitMQ | Stable | Yes | Yes |
Redis | Stable | Yes | Yes |
Amazon SQS | Stable | No | No |
Zookeeper | Experimental | No | No |
可以看见redis 和RabbitMq支持的是最好的,其中官方推荐RabbitMq,因为redis有断电或者重启丢失数据的风险。不过在这里我因为方便使用的是redis
2 配置
安装之后开始写配置文件 celeryconfig.py
BROKER_URL = ‘redis://127.0.0.1:6379/1‘ #消息队列选用
CELERY_RESULT_BACKEND = ‘redis://127.0.0.1:6379/0‘#结果存储
CELERY_TASK_SERIALIZER = ‘msgpack‘#任务序列化方式
CELERY_RESULT_SERIALIZER = ‘json‘#结果序列化方式
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_ACCEPT_CONTENT = [‘json‘, ‘msgpack‘]#接收序列化方式,masgpack序列化的方式效率很高。
创建celery 实例
app = Celery(‘app‘, include=[‘YoutProjectName.tasks‘])
app.config_from_object(‘YoutProjectName.celeryconfig‘) # 设置环境变量
3运行代码
创建YouProjName.tasks.py
# -*- coding: utf-8 -*-
#因为celery 不支持隐式导入
from __future__ import absolute_import, unicode_literals
import random
#引入app实例
from CronTab import celery_app
#引入日志类
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
#celery 装饰器,可以在运行的时候讲任务加到队列 这里是注册,将任务加到了注册中心,这样才能消费者才能收到
@celery_app.task()
def mul(x, y):
total = x * (y * random.randint(3, 100))
return total
@celery_app.task()
def xsum(numbers):
return sum(numbers)
@celery_app.task()
def xsum3(numbers):
return sum(numbers)
打开terminal开始运行
celery -A CronTab worker -l info 可以看到如下输出
-------------- [email protected] v3.1.25 (Cipater)
---- -----
--- * *** * -- Darwin-17.3.0-x86_64-i386-64bit
-- * - ---
- ** ---------- [config]
- ** ---------- .> app: app:0x1033ad978
- ** ---------- .> transport: redis://127.0.0.1:6379/1
- ** ---------- .> results: redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 8 (prefork)
-- *** ----
--- * ----- [queues]
-------------- .> default exchange=tasks(topic) key=task.#
Options:
-A APP, --app=APP app instance to use (e.g. module.attr_name) 指定实例app
-b BROKER, --broker=BROKER 指定broker
--loader=LOADER name of custom loader class to use.
--config=CONFIG Name of the configuration module
--workdir=WORKING_DIRECTORY
-C, --no-color
-q, --quiet
--version show program‘s version number and exit
-h, --help show this help message and exit
-A 是指定项目 worker 执行,
-l 日志级别
再打开一个terminal 开始
消费
In [1]: from apps.message import tasks
In [2]: tasks.div.delay(1,2)
Out[2]: <AsyncResult: 7b8db8fc-3119-48c5-8d95-c74a283ae8b2>
注意调用的时候要加delay 语句,这样的话,就会发送给任务队列,让任务队列来执行该语句。
这个时候我们再切回到worker进程
消费
[2018-02-14 10:00:53,963: INFO/MainProcess] Task apps.message.tasks.div[7b8db8fc-3119-48c5-8d95-c74a283ae8b2] succeeded in 0.008831199025735259s: 0.5
可以看见就这样队列执行了你的任务,这就是celery 任务队列生产消费的过程
使用不同的队列
当你有很多任务需要执行的时候,不要只使用默认的queue,这样会相互影响,并且拖慢任务执行的,导致重要的任务不能被快速的执行。
CELERY_QUEUES = ( # 定义任务队列
Queue(‘default‘, routing_key=‘task.#‘), # 路由键以“task.”开头的消息都进default队列
Queue(‘web_tasks‘, routing_key=‘web.#‘), # 路由键以“web.”开头的消息都进web_tasks队列
)
CELERY_DEFAULT_EXCHANGE = ‘tasks‘ # 默认的交换机名字为tasks
CELERY_DEFAULT_EXCHANGE_TYPE = ‘topic‘ # 默认的交换类型是topic
CELERY_DEFAULT_ROUTING_KEY = ‘task.default‘ # 默认的路由键是task.default,这个路由键符合上面的default队列
CELERY_ROUTES = {
‘apps.message.tasks.add‘: { # tasks.add的消息会进入web_tasks队列
‘queue‘: ‘web_tasks‘,
‘routing_key‘: ‘web.add‘,
},
‘apps.message.tasks.div‘: { # tasks.add的消息会进入web_tasks队列
‘queue‘: ‘web_tasks‘,
‘routing_key‘: ‘web.div‘,
}
}
然后执行命令
celery -A CronTab worker -l info -Q web_task
这样的话,只会执行文在web_task中的任务,其他任务都不在执行
启动多个workers执行不同的任务
在同一台机器上,对于优先级不同的任务最好启动不同的worker去执行,比如把实时任务和定时任务分开,把执行频率高的任务和执行频率低的任务分开,这样有利于保证高优先级的任务可以得到更多的系统资源,同时高频率的实时任务日志比较多也会影响实时任务的日志查看,分开就可以记录到不同的日志文件,方便查看。
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2.%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3.%h
可以像这样启动不同的worker,%h可以指定hostname,详细说明可以查看官方文档
高优先级的任务可以分配更多的concurrency,但是并不是worker并法数越多越好,保证任务不堆积就好。
定时任务
在 celeryconfig.py 文件中添加如下配置
CELERYBEAT_SCHEDULE = {
‘add‘: {
‘task‘: ‘apps.message.tasks.add‘,
‘schedule‘: timedelta(seconds=1),
‘args‘: (16, 16)
}
然后新开一个命令行启动
celery beat -A CronTab
可以看见任务开始不停的向队列发送
[2018-02-14 09:25:42,689: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:43,689: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:44,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:45,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:46,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:47,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:48,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:49,691: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
回到worker 可以看到,开始没秒钟接收并且消费了
```[2018-02-14 11:09:06,332: INFO/MainProcess] Received task: apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6][2018-02-14 11:09:06,332: INFO/Worker-8] apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6]: Executing task id 36490b36-9f3a-4158-aa2e-a6640d4607b6, args: [16, 16] kwargs: {}
[2018-02-14 11:09:06,332: INFO/Worker-1] apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6]: Executing task id 36490b36-9f3a-4158-aa2e-a6640d4607b6, args: [16, 16] kwargs: {}
[2018-02-14 11:09:06,333: INFO/MainProcess] Task apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6] succeeded in 0.001320198003668338s: 32
[2018-02-14 11:09:06,333: INFO/MainProcess] Task apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6] succeeded in 0.0014644850161857903s: 32
```