一、 基本介绍
Celery是一个专注于实时处理和任务调度的分布式任务队列。所谓任务就是消息,消息中的有效载荷中包含要执行任务需要的全部数据。
使用Celery常见场景:
- Web应用。当用户触发的一个操作需要较长时间才能执行完成时,可以把它作为任务交给Celery去异步执行,执行完再返回给用户。这段时间用户不需要等待,提高了网站的整体吞吐量和响应时间。
- 定时任务。生产环境经常会跑一些定时任务。假如有上千台的服务器、上千种任务,定时任务的管理会很困难,Celery可以帮助我们快速在不同的机器设定不同种任务。
- 其他可以异步执行的任务。为了充分提高网站性能,对于请求和响应之外的那些不要求必须同步完成的附加工作都可以异步完成。比如发送邮件/短信、推送消息、清理/设置缓存等。
Celery特性:
- 方便地查看定时任务的执行情况,比如执行是否成功、当前状态、执行任务花费的时间等。
- 可以使用功能齐备的管理后台或者命令行添加、更新、删除任务。
- 方便把任务和配置管理相关联。
- 可选多进程、Evenlent和Gevent三种模式并发执行。
- 提供多种错误处理机制。
- 提供多种任务原语,方便实现任务分组、拆分和调用链。
- 支持多种消息代理和存储后端。
Celery架构图:
产生任务的方式有两种:
- 发布者发布任务(Web应用)
- 任务调度按期发布任务(定时任务)
Celery组件介绍:
- Celery Beat: 任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
- Celery Worker: 执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
- Broker: 消息代理,或者叫做消息中间件,接受任务生产者发送过来的任务消息,存进队列在按序分发给任务消费方。
- Producer: 调用Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
- Result Backend: 任务处理完后保存状态信息和结果,以供查询。Celery默认支持Redis、RabbitMQ、MongoDB、Django ORM SQLAlchemy等方式。
Celery目前支持RabbitMQ、Redis、MongoDB、Beanstalk、Zookeeper、SQLAlchemy等作为消息代理,但用于生产环境只有RabbitMQ和Redis,官方推荐RabbitMQ来作为Celery的消息代理。
在客户端和消费者之间传输数据需要序列化和反序列化,Celery支持的序列化方案如下图
名称 | 介绍 |
---|---|
pickle | pickle是Python标准库中的一个模块,支持python内置的数据结构,但是它是Python的专有协议,从Celery 3.2开始,由于安全性等原因Celery拒绝支持pickle这个方案 |
json | json支持多种语言,可用于跨语言方案 |
yaml | yaml的表达能力更强,支持的数据类型比json多,但是python客户端的性能不如json |
msgpack | msgpack是一个二进制的类json的序列化方案,但是比json的数据结构更小、更快 |
二、安装配置Celery
为了提供更高的性能,采用如下方案:
- 选择RabbitMQ作为消息代理。
- RabbitMQ的Python客户端选择librabbitmq这个C库。
- 选择Msgpack做序列化
- 选择Redis做结果存储
sudo apt-get install rabbitmq-server
sudo apt-get install redis-server
sudo pip install "celery[librabbitmq,redis,msgpack]"
三、示例演示
项目目录结构
tree project
project
├── celeryconfig.py
├── celery.py
├── __init__.py
└── tasks.py
主程序celery.py:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# 拒绝隐士引入,因为celery.py的名字和celery的包名冲突,需要使用这条语句让程序正确运行
from __future__ import absolute_import
from celery import Celery
# app是 Celery类的实例,创建的时候添加了project.tasks这个模块,也就是包含了project/tasks.py这个文件
app = Celery(‘project‘, include=[‘project.tasks‘])
# 把Celery配置存放进project/celeryconfig文件,使用app.config_from_object加载配置
app.config_from_object(‘project.celeryconfig‘)
if __name__ == ‘__main__‘:
app.start()
任务函数文件tasks.py:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from project.celery import app
# 让任务函数生效的方法是添加app.task装饰器
@app.task
def add(x, y):
return x + y
配置文件celeryconfig.py:
# -*- coding:utf-8 -*-
BROKER_URL = ‘amqp://guest:[email protected]:5672//‘ # 使用RabbitMQ作为消息代理
CELERY_TASK_PROTOCOL = 1 # 现在celery升级到了4.0,是老版本的librabbitmq与最新的celery4.0 Message Protocol协议不兼容,celery4.0默认使用Task messages Version 2 ,而librabbitmq使用Task messages Version 1
CELERY_RESULT_BACKEND = ‘redis://localhost:6379/0‘ # 把结果存在Redis
CELERY_TASK_SERIALIZER = ‘msgpack‘ # 任务序列化肯反序列化使用msgpack方案
CELERY_RESULT_SERIALIZER = ‘json‘ # 读取任务结果一般性能要求不高,所以使用可读性更好的json
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_ACCEPT_CONTENT = [‘json‘, ‘msgpack‘] # 指定接收的内容类型
启动消费者:
[email protected]:~$ celery -A project.celery worker -l info
-A 参数默认会寻找project.celery这个模块
看到如下信息说明worker服务运行起来了
-------------- [email protected] v4.1.0 (latentcall)
---- **** -----
--- * *** * -- Linux-4.4.0-92-generic-x86_64-with-Ubuntu-16.04-xenial 2017-11-21 09:35:15
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: project:0x7f9d656cd110
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: redis://localhost:6379/0
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. project.tasks.add
[2017-11-21 09:35:19,030: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672//
[2017-11-21 09:35:19,662: INFO/MainProcess] mingle: searching for neighbors
[2017-11-21 09:35:22,960: INFO/MainProcess] mingle: all alone
[2017-11-21 09:35:23,063: INFO/MainProcess] [email protected] ready.
上述信息提供了消息代理和存储结果的地址、并发数量、任务列表、交换类型等
开启另一个终端,用ipython调用add函数
In [1]: from project.tasks import add
In [2]: r = add.delay(1,3)
In [3]: r
Out[3]: <AsyncResult: a14d2045-ad40-4240-bbcf-1a8f07899485>
In [4]: r.result
Out[4]: 4
In [5]: r.status
Out[5]: u‘SUCCESS‘
In [6]: r.successful()
Out[6]: True
In [7]: r.backend
Out[7]: <celery.backends.redis.RedisBackend at 0x7faae433a450> # 保存在redis中
worker终端上显示执行了任务:
[2017-11-21 09:37:45,681: INFO/MainProcess] Received task: project.tasks.add[a14d2045-ad40-4240-bbcf-1a8f07899485]
[2017-11-21 09:37:45,923: INFO/ForkPoolWorker-2] Task project.tasks.add[a14d2045-ad40-4240-bbcf-1a8f07899485] succeeded in 0.238272875s: 4
任务的task_id根据上面提到的task_id获得,可以用下面方法获得结果
方法一:
In [9]: task_id = ‘a14d2045-ad40-4240-bbcf-1a8f07899485‘
In [10]: add.AsyncResult(task_id).get()
Out[10]: 4
方法二:
In [12]: from celery.result import AsyncResult
In [13]: AsyncResult(task_id).get()
Out[13]: 4
另外redis库中存放了key为task_id 的值,如下所示
[email protected]:~$ redis-cli
127.0.0.1:6379> keys *
1) "celery-task-meta-a14d2045-ad40-4240-bbcf-1a8f07899485"
127.0.0.1:6379> get celery-task-meta-a14d2045-ad40-4240-bbcf-1a8f07899485
"{"status": "SUCCESS", "traceback": null, "result": 4, "task_id": "a14d2045-ad40-4240-bbcf-1a8f07899485", "children": []}"