Celery框架
Posted tfzz
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Celery框架相关的知识,希望对你有一定的参考价值。
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
定时任务:定时执行某件事情,比如每天数据统计
pip install celery
2.建立文件夹和py文件
project # 一般为项目名如:大路飞
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py
│ └── tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果
3.代码编写
#1.app对象 :
# 导入模块
from celery import Celery
broker = ‘redis://127.0.0.1:6379/14‘ # 任务库:实际情况选择
backend = ‘redis://127.0.0.1:6379/15‘ # 结果库:实际情况选择
include=[‘celery_task.tasks‘] # 任务们,注意路径
# 实例产生app对象
app = Celery(broker=broker,backend=backend,include=include)
#2.任务:
# 导入app实例对象 from .celery import app # 任务1 @app.task def add(n, m): print(n) print(m) print(‘n+m的结果:%s‘ % (n + m)) return n + m # 任务2 @app.task def low(n, m): print(n) print(m) print(‘n-m的结果:%s‘ % (n - m)) return n - m
#3.添加任务:
# 导入任务文件tasks.py from celery_task import tasks # 添加立即执行任务 t1 = tasks.add.delay(10, 20) t2 = tasks.low.delay(100, 50) print(t1.id) # 添加延迟任务 from datetime import datetime, timedelta def eta_second(second): ctime = datetime.now() utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) # 这里单位为秒,实际情况更改 time_delay = timedelta(seconds=second) return utc_ctime + time_delay tasks.low.apply_async(args=(200, 50), eta=eta_second(10))
# 4.获取结果:
# 导入实例对象 from celery_task.celery import app from celery.result import AsyncResult # 任务id,实际情况更改 id = ‘21325a40-9d32-44b5-a701-9a31cc3c74b5‘ if __name__ == ‘__main__‘: async = AsyncResult(id=id, app=app) if async.successful(): result = async.get() print(result) elif async.failed(): print(‘任务失败‘) elif async.status == ‘PENDING‘: print(‘任务等待中被执行‘) elif async.status == ‘RETRY‘: print(‘任务异常后正在重试‘) elif async.status == ‘STARTED‘: print(‘任务已经开始被执行‘)
#5.启动服务运行
# 注意cd 到project文件夹下载执行命令
# 非windows # 命令:celery worker -A celery_task -l info # windows: # pip3 install eventlet # celery worker -A celery_task -l info -P eventlet # 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本 # 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本
pip install celery
2.建立文件夹和py文件
project # 一般为项目名如:大路飞
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py
│ └── tasks.py # 所有任务函数
└── get_result.py # 获取结果
3.代码编写
#1.app对象 :
# 导入模块 from celery import Celery broker = ‘redis://127.0.0.1:6379/14‘ # 任务库:实际情况选择 backend = ‘redis://127.0.0.1:6379/15‘ # 结果库:实际情况选择 include=[‘celery_task.tasks‘] # 任务们,注意路径 # 实例产生app对象 app = Celery(broker=broker,backend=backend,include=include) # 时区,实际情况选择 app.conf.timezone = ‘Asia/Shanghai‘ # 是否使用UTC,实际情况选择 app.conf.enable_utc = False # 任务的定时配置 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { # 任意任务名 ‘low-task‘: { ‘task‘: ‘celery_task.tasks.low‘, # 任务源,注意路径 ‘schedule‘: timedelta(seconds=3), # 3秒加载执行一次 # ‘schedule‘: crontab(hour=8, day_of_week=1), # 每周一早八点 ‘args‘: (300, 150), # 任务的参数 } }
# 2.获取结果:
from celery_task.celery import app from celery.result import AsyncResult id = ‘21325a40-9d32-44b5-a701-9a31cc3c74b5‘ if __name__ == ‘__main__‘: async = AsyncResult(id=id, app=app) if async.successful(): result = async.get() print(result) elif async.failed(): print(‘任务失败‘) elif async.status == ‘PENDING‘: print(‘任务等待中被执行‘) elif async.status == ‘RETRY‘: print(‘任务异常后正在重试‘) elif async.status == ‘STARTED‘: print(‘任务已经开始被执行‘)
#3.启动服务
# 注意cd 到project文件夹下载执行命令 1.worker cmd窗口 # 非windows # 命令:celery worker -A celery_task -l info # windows: # pip3 install eventlet # celery worker -A celery_task -l info -P eventlet 2. 添加任务cmd窗口 # 3)添加任务:自动添加任务,所以要启动一个添加任务的服务 # 命令:celery beat -A celery_task -l info 3.获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本
#2.代码编写:celery.py
# django环境配置 # 开启django支持 import os, django # 如果包文件不在根目录下根据实际情况将大路飞添加环境便令 #import sys # sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) os.environ.setdefault(‘DJANGO_SETTINGS_MODULE‘, ‘luffyapi.settings.dev‘) django.setup() # 导入模块 from celery import Celery broker = ‘redis://127.0.0.1:6379/14‘ # 任务库 backend = ‘redis://127.0.0.1:6379/15‘ # 结果库 include = [‘celery_task.tasks‘] # 实例产生app对象 app = Celery(broker=broker,backend=backend,include=include) # 时区 app.conf.timezone = ‘Asia/Shanghai‘ # 是否使用UTC app.conf.enable_utc = False # 任务的定时配置 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { ‘update_banner_list_task‘: { ‘task‘: ‘celery_task.tasks.update_banner_list‘, ‘schedule‘: timedelta(seconds=10), # ‘schedule‘: crontab(hour=8, day_of_week=1), # 每周一早八点 ‘args‘: (), } }
#3.tasks.py
# 导入app对象 from .celery import app # 导入home应用的模型表 from home.models import Banner # 导入设置中的轮播数量 from settings.const import BANNER_COUNT # 导入home应用的序列化 from home.serializers import BannerModelSerializer from django.core.cache import cache @app.task def update_banner_list(): banner_query = Banner.objects.filter(is_delete=False, is_show=True).order_by(‘-orders‘)[:BANNER_COUNT] # 序列化 banner_data = BannerModelSerializer(banner_query,many=True).data for banner in banner_data: banner[‘image‘]= ‘http://127.0.0.1:8000‘+ banner[‘image‘] # 存入缓存数据库 cache.set(‘banner_list‘,banner_data) return True
#4.启动服务
# 注意cd 到project文件夹下载执行命令如大路飞
1.worker cmd窗口
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet
2. 添加任务cmd窗口
# 3)添加任务:自动添加任务,所以要启动一个添加任务的服务
# 命令:celery beat -A celery_task -l info
以上是关于Celery框架的主要内容,如果未能解决你的问题,请参考以下文章