Celery框架
Posted mofujin
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Celery框架相关的知识,希望对你有一定的参考价值。
一.Celery框架 独立运行的框架
1.1celery框架自带一个socket 底层通信接口 相当于起了一24 小时不间断的服务运行的项目(服务端) 好比一个死循环 不依赖Djagno 框架(wgiref 实现的并发 帮我们起的Djamgo 项目),mysql 也是自带一个socket相当于启动了一个进程 接外界所有我们外界请求的客户端任务
1.2 目的:启动celery框架是执行服务中的任务的 服务中带一个执行任务的对象,会执行准备就绪的任务,将执行任务的结果保存起来.
1.3 Celery框架的三大组成部分:
(1)存放要执行的任务 >>> broker(中间件)
(2)执行任务的对象worker
(3)存放执行任务结果的banckend
1.4 安装的celery主体模块, 默认只提供一个woker对象 , 要结合其他技术提供broker(任务)和backend 存放执行的结果 (结果是需要我们去拿)(Rabbit, Rdis)
工作流程图
1.5 使用场景:
1.将大量耗时的任务交给celery 去做 异步(socket)
2 .爬虫 将每天定时的任务交给celery 每天 定时 或者多长时间或延缓多长时间再执行任务(定时任务)
3. 安装celery
执行指令:pip install celery
4. celery配置
(1)简单使用
celery 执行开不任务 (不用等待结果 执行其他任务 有结果就返回 提供效率)
Celery 包架构的封装
# 项目的创建
```python project ├── celery_task # celery包 init 包实现包内导包的功能 通过点语法 │ ├── __init__.py # 包文件 │ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py │ └── tasks.py # 所有任务函数 ├── add_task.py # 添加任务 └── get_result.py # 获取结果
# 基本使用
1)创家app+任务 >>> celery_task 中创建task1 或者task2 多个任务
任务一
# 保内导包 from . celery import app # 通过应用去执行 # 函数 >>> 一个任务 @app.task def add(a,b): print(a,b) return a+b # 执行结果
任务二
#导包 from .celery import app # 添加ajpp应用 @app.task def low(n1,n2): print(‘减法:%s‘%(n1-n2))
添加app应用
# 文件必须叫celery >>> 主文件 安装pip install celery """ import socket server = socket.socket() server.bind((‘localhost‘,8080)) server.listen(5) # 链接循环 while True: # 24小时只要机器不停 一致服务 conn, addr = server.accept() """ # celery.py 文件添加app 和 添加任务 # import celery # 这个不是文件名的celery哦 这是我们安装的框架 from celery import Celery # 通过Celery 提供一个celery 应用 # 存储任务和结果的路径 broker = ‘redis://127.0.0.1/14‘ # 任务仓库 backend = ‘redis://127.0.0.1/15‘ # 结果仓库 # 包含的任务 include = [‘celery_task.task1‘,‘celery_task.task2‘] # 任务们 函数们(需求) # 应用 app = Celery(broker=broker, backend=backend, include=include) # 准备去开启任务 文件task1.2 去提供任务 >>> worker 执行 # 如何启动 简单版在文件"添加celery任务"
2) 启动celery(app) 服务
windows :celery worker -A celery_task -1 info -p eventlet
如果执行结果报错时
# pip3 install eventlet
非wind ows ; celery -A cerler_task - 1 info
3)
添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本
获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本
celery 的使用
测试结果 启动任务的文件 执行脚本
(1)立即添加任务
from celery_task import task1,task2 # 调用celery框架的方法 完成任务的添加 # 手动添加立即任务 调用delay就相当于将add 交给celery进行调用 res = task1.add.delay(100,150) print(res) res1 = task2.low.delay(200,100) print(res1)
(2)手动添加延时任务
# 手动添加延任务 from datetime import datetime, timedelta def eta_second(second): ctime = datetime.now() utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) time_delay = timedelta(seconds=second) return utc_ctime + time_delay res = task2.low.apply_async(args=(200, 50), eta=eta_second(10)) # 延迟十秒后执行 print(res)
result。py 获取执行任务的结果
from celery_task.celery import app from celery.result import AsyncResult id = ‘db733c22-a5fd-43e6-8f31-9dec0acf5f64‘ if __name__ == ‘__main__‘: async = AsyncResult(id=id, app=app) if async.successful(): result = async.get() print(result) elif async.failed(): print(‘任务失败‘) elif async.status == ‘PENDING‘: print(‘任务等待中被执行‘) elif async.status == ‘RETRY‘: print(‘任务异常后正在重试‘) elif async.status == ‘STARTED‘: print(‘任务已经开始被执行‘)
去redis 中获取task_id 即可获取执行的结果 结果不会执行的传给谁 如果需要我们取就行了
(3)高级使用
task 任务
# 执行的 任务集合 from . celery import app # 同一级别的 用点 去获取另一个名称空间的值 @app.task def jump(a,b): print(a,b) return ‘两数乘:%s ‘ % (a*b) # @app.task # def low(c, d): # print(c, d) # return c-d
celery 添加任务
from celery import Celery broker = ‘redis://127.0.0.1:6379/8‘ backend = ‘redis://127.0.0.1:6379/10‘ app = Celery(broker=broker, backend=backend, include=[‘celery_task.tasks‘]) # 时区 app.conf.timezone = ‘Asia/Shanghai‘ # 是否使用UTC app.conf.enable_utc = False # 任务的定时配置 添加执行的任务 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { ‘jump-task‘: { ‘task‘: ‘celery_task.tasks.jump‘, ‘schedule‘: timedelta(seconds=3), # 每三秒执行一次 # ‘schedule‘: crontab(hour=8, day_of_week=1), # 每周一早八点 ‘args‘: (300, 150), } }
执行的命令
worker 的指令 socket 任务先开起
# 2)启动celery(app)服务: # 非windows # 命令:celery worker -A celery_task -l info # windows: # pip3 install eventlet # celery worker -A celery_task -l info -P eventlet
beat 的服务socket 后启动指令
命令:celery beat -A celery_task -l info
ctr+c 停止服务
(4) 定时任务和场景
********celery 三种任务的使用场景 - 重点
url:发送短信的数据/email/-发送者的地址 与 发送消息(发送时间)
视图类:EamilAPIView - post
psot逻辑:
1)交给celery异步立即执行:拿到celery_task 包中的任务 , 调通delay就可以将任务交给celery异步执行
2)定时发送短息(延时指定某个时间点发送);拿到celery_task包中的任务 ,根据发送时间去顶延时执行
3)定时任务:定时爬虫、 定时更新(接口缓存更新)
(5)celery 再接口中缓存
from celery import Celery broker = ‘redis://127.0.0.1:6379/8‘ backend = ‘redis://127.0.0.1:6379/10‘ app = Celery(broker=broker, backend=backend, include=[‘celery_task.tasks‘]) # 时区 app.conf.timezone = ‘Asia/Shanghai‘ # 是否使用UTC app.conf.enable_utc = False from celery import Celery broker = ‘redis://127.0.0.1:6379/1‘ backend = ‘redis://127.0.0.1:6379/2‘ app = Celery(broker=broker, backend=backend, include=[‘celery_task.tasks‘]) # 时区 app.conf.timezone = ‘Asia/Shanghai‘ # 是否使用UTC app.conf.enable_utc = False # 任务的定时配置 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { ‘django-task‘: { ‘task‘: ‘celery_task.tasks.update_banner_list‘, ‘schedule‘: timedelta(seconds=3), ‘args‘: (), } }
tasks
# 加载页面每十秒更新一次 图片 from .celery import app from home.models import Banner # 页面 from settings.const import BANNER_COUNT # 论播图的最大展示条数 from django.core.cache import cache # 加载到缓存 from home.serializers import BannerModelSerializer @app.task def update_banner_list(): # 获取最新内容 banner_query = Banner.objects.filter(is_delete=False,is_show=True) # 序列化 banner_data = BannerModelSerializer(banner_query,many=True).data for banner in banner_data: banner[‘image‘] = ‘http://127.0.0.1:8000‘ + banner[‘image‘] # 更新缓存 cache.set(‘banner_list‘,banner_data) return True
注意点:(1)
(2)后端启动
待更新。。。。
以上是关于Celery框架的主要内容,如果未能解决你的问题,请参考以下文章