Django 学习之Celery(芹菜)
Posted yungiu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Django 学习之Celery(芹菜)相关的知识,希望对你有一定的参考价值。
Celery 介绍
文档:http://docs.celeryproject.org/en/latest/index.html
Celery 是一个功能完备,即插即用的异步任务队列,可以独立于主进程运行,在主进程退出后,也不影响队列中的任务的执行。
任务执行异常退出,重新启动后,会继续执行队列中的其他任务,同时可以缓存停止期间接收的工作任务,这个功能依赖于消息队列(RabbitMQ, Redis等)
Celery 特点
简单,易于使用和维护,有丰富的文档
高效,单个Celery 进程 每分钟可以处理数百万个任务
灵活,Celery 种几乎每个部分都可以自定义扩展
Celery 非常易于集成到一些Web开发框架中
Celery角色
Celery clinet:这是任务生产者,它负责将任务(tasks)发送到broker中
Broker:broker 负责将任务分发给响应的Celery worker
Celery worker:这是任务的执行者,完成相应的业务逻辑,在具体实现上体现为Python 函数
实现过程
Celery 通过消息进行通信,通常使用一个叫broker(中间人)来协作client(任务的发出者)和worker(任务的处理者),client 发出消息到队列中,broker将队列中的消息发给worker 来处理
一个Celery系统可以包含很多的worker和broker,可增强横向扩展和高可用性能
Celery组成结构
Celery 的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成
1.消息中间件:Celery 本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成,官方推荐RabbitMQ和Redis作为消息队列
2.任务执行单元:Worker 是Celery提供的任务执行的单元,worker 并发的运行在分布式的系统节点中
3.任务结果存储:Task result store 用来存储worker 执行的任务的结果,Celery 支持以不同方式存储任务的结果 包括AMQP, Redis,memcached,MongoDB,SQLAlchemy,Django ORM等等
Celery 适应场景
Celery 适用异步处理问题,比如发送邮件、文件上传、图像处理等比较耗时的操作,我们可以将其加入到任务队列去异步执行,这样用户不需要等待很久,提高用户体验。
1.异步任务处理:例如给注册用户发送短信或者确认邮件的任务
2.大型任务:执行时间较长的任务,例如视频和图片处理,添加水印和转码等,需要执行任务时间长
3.定时执行的任务:支持任务的定时执行和设定时间执行,例如性能压测定时执行。
Celery 的实现原理
Celery基本实现原理是amqp协议的一个实现,比较耗时,耗资源的操作通过amqp协议发送到远端,让远端去处理它,在url进来,通过view返回的时候,在前端能够得到很快的响应,任务都在后台处理掉,提高用户的体验度,能够跟框架结合,执行一些可能不是立马需要结果的任务,这就是大家都用的异步任务。
任务队列中包含任务的工作单元,有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理,是一种跨线程、跨机器工作的一种机制。
Celery 的在Django中的实际使用(Django1.11.11, Celery4.0.2)
1.在项目下创建具有标识Celery的Python包(脚本名),比如:celery_tasks
2.在celery_tasks 包中创建main.py,用于作为Celery的启动文件
1)为Celery使用Django配置文件
import os
if not os.getenv(‘DJANGO_SETTINGS_MODULE‘):
os.environ[‘DJANGO_SETTINGS_MODULE‘] = ‘项目名.settings‘
2)创建Celery 对象
from celery import Celery
app = Celery(‘celery_tasks‘) # 一般设置main的参数为脚本名
3)加载配置文件
app.config_from_object(‘celery_tasks.config‘)
4)设置自动加载任务的任务名
app.autodiscover_tasks([‘celery_tasks.xxx‘]) # xxx为celery_tasks 下的任务名
3.在celery_tasks包中创建config.py 文件,用于保存Celery 的配置信息(推荐使用方法一RabbitMQ)
1)RabbitMQ
RabbitMQ是Celery的默认代理,因此除了要使用的代理实例的URL位置之外,它不需要任何其他依赖项或初始配置
broker_url = ‘amqp://myuser: [email protected]:5672/myvhost‘
# 也可以在实例Celery 对象的时候指定 如:
# app = Celery(‘celery_tasks‘,
# broker=‘amqp://myuser: [email protected]:5672/myvhost‘,
# backend=‘rpc://‘
# )
# backend 参数标识celery worker 执行完的结果需要保存,rpc 表示通过RPC(Remote Procedure Call)模式被送到RabbitMQ,
# 如果不指定backend 参数,任务结果将被丢弃
2)Redis
broker_url = ‘redis://127.0.0.1/14‘
result_backend = ‘redis://127.0.0.1/15‘
# 也可以在实例Celery 对象的时候指定 如:
# app = Celery(‘celery_tasks‘,
# broker=‘redis://127.0.0.1/14‘,
# backend=‘redis://127.0.0.1/15‘
# )
4.创建任务,并让Celery 检测 (检测就是执行‘2.4)’)
如:发送短信
from libs.yuntongxun.sms import CCP
from celery_tasks.main import app
# 可以设置name参数,不设置任务名默认是该函数名的路径(如:.celery_tasks.sms.send_sms_code)
# app.task装饰符告诉Celery 这个函数并不在celery client 端执行,当他们被调用时只将调用信息通过# brocker 发送给celery workers 执行
@app.task(name=‘send_sms_code‘)
def send_sms_code(mobile, sms_code):
ccp = CCP()
ccp.send_template_sms(mobile, [sms_code, 5], 1)
5.调用Celery任务
from celery_tasks.sms.tasks
1)delay方法,比如:
tasks.send_sms_code.delay(mobile, sms_code)
2)apply_async方法,比如:
# tasks.send_sms_code.apply_async((mobile, sms_code))
tasks.send_sms_code.apply_async((mobile, sms_code), queue=‘xxx‘)
由此可以看出,delay方法是apply_async 简化版本,但是apply_async方法可以带非常多的配置参数,包括指定队列等,queue指定队列名称,把不同任务分配不同的队列(推荐使用delay方法)
6.执行命令,启动worker 让Celery 单独执行
celery -A Celery实例对象路径 worker -l info
celery -A celery_tasks.main worker -l info 或 celery -A celery_tasks.main worker --loglevel=info
7.任务的状态
PENDING : 等待
STARTED : 开始
RETRY: 重试
FAILURE: 失败
SUCCESS : 成功
如:result = tasks.send_sms_code.delay(mobile, sms_code)
通过result.state 或result.status来查看任务的状态
更多 result 方法 查看http://docs.celeryproject.org/en/latest/reference/celery.result.html#module-celery.result
以上是关于Django 学习之Celery(芹菜)的主要内容,如果未能解决你的问题,请参考以下文章
如何在任务中获取芹菜结果模型(使用 django-celery-results)