celery框架
Posted shaozheng
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了celery框架相关的知识,希望对你有一定的参考价值。
目录
celery框架:
介绍:
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
celery 组成: broker | worker | backend
消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等。
任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等。
使用场景:
Celery多用来执行异步任务,将耗时的操作交由Celery去异步执行,比如发送邮件、短信、消息推送、音视频处理等。
还可以执行定时任务,定时执行某件事情,比如Redis中的数据每天凌晨两点保存至mysql数据库,实现Redis的持久化。
延时任务:解决延迟任务
注意: RabbitMQ : 异步的消息队列 (线上使用)
celery + redis :
环境搭建:
配置:
pip install celery
pip install django-redis
# Windows中还需要安装以下模块,用于任务执行单元
pip install eventlet
redis-- settings.py:
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://127.0.0.1:6379",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
"CONNECTION_POOL_KWARGS": {"max_connections": 100}
# "PASSWORD": "123",
}
}
}
任务结构:
创建文件目录结构:
pro_cel
├── celery_task# celery相关文件夹
│ ├── celery.py # celery连接和配置相关文件,必须叫这个名字
│ └── tasks1.py # 所有任务函数
│ └── tasks2.py # 所有任务函数
├── check_result.py # 检查结果
└── send_task.py # 触发任务
注意,检查结果与触发任务的模块不能写在celery_task模块中,不然会报导入celery的错误。
celery框架工作流程
1)创建Celery框架对象app,配置broker和backend,得到的app就是worker
2)给worker对应的app添加可处理的任务函数,用include配置给worker的app
3)完成提供的任务的定时配置app.conf.beat_schedule
4)启动celery服务,运行worker,执行任务
5)启动beat服务,运行beat,添加任务
任务实现:
基本使用:
1. 创建环境: celery 环境搭建
2.创建app + 任务
(创建: broker + app + backend)
3.执行任务:
# 非windows
celery worker -A celery_task -l info
# windows:
pip3 install eventlet
celery worker -A celery_task -l info -P eventlet
4.添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本
5.获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本
eg:
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
task.py:
from .celery import app
@app.task
def add(n, m)
pass
定时任务
celery.py:
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
# 消息中间件,密码是你redis的密码
# broker='redis://:123456@127.0.0.1:6379/2' 密码123456
broker = 'redis://127.0.0.1:6379/0' # 无密码
# 任务结果存储
backend = 'redis://127.0.0.1:6379/1'
# 生成celery对象,'task'相当于key,用于区分celery对象(任意名)
# include参数需要指定任务模块
app = Celery('task', broker=broker, backend=backend, include=[
'celery_task.add_task',
'celery_task.send_email'
])
# 时区 (可设置任意一个)
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 定时执行
app.conf.beat_schedule = {
# 名字随意命名
'add-every-5-seconds': {
# 执行add_task下的addy函数
'task': 'celery_task.add_task.add',
# 每10秒执行一次
'schedule': timedelta(seconds=10),
# add函数传递的参数
'args': (1, 2)
},
'add-every-10-seconds': {
'task': 'celery_task.add_task.add',
# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务
'schedule': crontab(minute=5),
'args': (1, 2)
}
}
send_msg.py:
#项目配置
# EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.qq.com' # 如果是 163 改成 smtp.163.com
EMAIL_PORT = 465
EMAIL_HOST_USER = '1504703554@qq.com' # 发送邮件的邮箱帐号
EMAIL_HOST_PASSWORD = '授权码' # 授权码,各邮箱的设置中启用smtp服务时获取
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# 这样收到的邮件,收件人处就会这样显示
# DEFAULT_FROM_EMAIL = '2333<'1504703554@qq.com>'
EMAIL_USE_SSL = True # 使用ssl
# EMAIL_USE_TLS = False # 使用tls
# EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的,即只能有一个为 True
import os
if __name__ == "celery_task.send_email":
# 因为需要用到django中的内容,所以需要配置django环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "do_celery.settings")
import django
django.setup()
# 导入celery对象app
from celery_task.celery import app
from app01 import models
# 导入django自带的发送邮件模块
from django.core.mail import send_mail
import threading
@app.task
def send_email1(id): # 此时可以直接传邮箱,还能减少一次数据库的IO操作
# 此处的id由用户注册的视图函数中传入
user_obj = models.UserInfo.objects.filter(pk=id).first()
email = user_obj.email
# 启用线程发送邮件,此处最好加线程池
t = threading.Thread(target=send_mail, args=(
"激活邮件,点击激活账号", # 邮件标题
'点击该邮件激活你的账号,否则无法登陆', # 给html_message参数传值后,该参数信息失效
settings.EMAIL_HOST_USER, # 用于发送邮件的邮箱地址
[email], # 接收邮件的邮件地址,可以写多个
),
# html_message中定义的字符串即HTML格式的信息,可以在一个html文件中写好复制出来放在该字符串中
kwargs={'html_message': "<a href='http://127.0.0.1:8000/active_user/?id=%s'>点击激活gogogo</a>" % id}
)
t.start()
check_result.py:
from celery.result import AsyncResult
from celery_task.celery import app
def check_result(task_id):
async1 = AsyncResult(id=task_id, app=app)
if async1.successful():
result = async1.get()
print(result)
return result
# result.forget() # 将结果删除
# async.revoke(terminate=True) # 无论现在是什么时候,都要终止
# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
elif async1.failed():
print('执行失败')
return '执行失败'
elif async1.status == 'PENDING':
print('任务等待中被执行')
return '任务等待中被执行'
elif async1.status == 'RETRY':
print('任务异常后正在重试')
return '任务异常后正在重试'
elif async1.status == 'STARTED':
print('任务已经开始被执行')
return '任务已经开始被执行'
执行:
启用任务执行单元worker(以windows为例):
celery worker -A celery_task -l info -P eventlet
app.conf.beat_schdule定时任务时,还需要启用beat,用于定时朝消息队列提交任务:
celery beat -A celery_task -l info
延时任务:
from celery_app_task import add
from datetime import datetime
# 方式一
# v1 = datetime(2019, 2, 13, 18, 19, 56)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = add.apply_async(args=[1, 3], eta=v2)
# print(result.id)
# 方式二
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
# 使用apply_async并设定时间,这里是10秒后执行任务
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)
django + celery:
from datetime import timedelta
from celery import Celery
from celery.schedules import crontab
cel = Celery('tasks', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/1', include=[
'celery_task.tasks1',
'celery_task.tasks2',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False
cel.conf.beat_schedule = {
# 名字随意命名
'add-every-10-seconds': {
# 执行tasks1下的test_celery函数
'task': 'celery_task.tasks1.test_celery',
# 每隔2秒执行一次
# 'schedule': crontab(minute="*/1"),
'schedule': timedelta(seconds=2),
# 传递参数
'args': ('test',)
},
}
执行:
# 启动一个beat
celery beat -A celery_task -l info
# 启动work执行
celery worker -A celery_task -l info -P eventlet
以上是关于celery框架的主要内容,如果未能解决你的问题,请参考以下文章