Celery框架

Posted mofujin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Celery框架相关的知识,希望对你有一定的参考价值。

  一.Celery框架 独立运行的框架  

  1.1celery框架自带一个socket 底层通信接口 相当于起了一24 小时不间断的服务运行的项目(服务端) 好比一个死循环 不依赖Djagno 框架(wgiref 实现的并发 帮我们起的Djamgo 项目),mysql 也是自带一个socket相当于启动了一个进程 接外界所有我们外界请求的客户端任务

  1.2 目的:启动celery框架是执行服务中的任务的 服务中带一个执行任务的对象,会执行准备就绪的任务,将执行任务的结果保存起来.

  1.3 Celery框架的三大组成部分:

  (1)存放要执行的任务 >>> broker(中间件)

  (2)执行任务的对象worker

  (3)存放执行任务结果的banckend

  1.4 安装的celery主体模块, 默认只提供一个woker对象 , 要结合其他技术提供broker(任务)和backend 存放执行的结果 (结果是需要我们去拿)(Rabbit, Rdis) 

   工作流程图

技术图片

 

    1.5 使用场景:

  1.将大量耗时的任务交给celery 去做 异步(socket)

  2 .爬虫 将每天定时的任务交给celery 每天 定时 或者多长时间或延缓多长时间再执行任务(定时任务)

   3. 安装celery 

  执行指令:pip install celery 

  4. celery配置 

  (1)简单使用

   celery 执行开不任务 (不用等待结果 执行其他任务 有结果就返回 提供效率)

  Celery 包架构的封装

  # 项目的创建

```python
project
    ├── celery_task      # celery包 init 包实现包内导包的功能 通过点语法
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py
    │   └── tasks.py    # 所有任务函数
    ├── add_task.py      # 添加任务
    └── get_result.py   # 获取结果

  # 基本使用 

  1)创家app+任务  >>> celery_task 中创建task1 或者task2 多个任务

   任务一

# 保内导包

from . celery import app   # 通过应用去执行
# 函数 >>> 一个任务
@app.task
def add(a,b):
    print(a,b)
    return a+b  #  执行结果

   任务二

  

#导包
from .celery import app
# 添加ajpp应用
@app.task
def low(n1,n2):
    print(减法:%s%(n1-n2))
 

  添加app应用

# 文件必须叫celery >>> 主文件 安装pip install celery
"""


import socket
server = socket.socket()
server.bind((‘localhost‘,8080))
server.listen(5)
# 链接循环
while True:
    # 24小时只要机器不停 一致服务
    conn, addr = server.accept()
"""

# celery.py 文件添加app 和 添加任务 
# import celery  # 这个不是文件名的celery哦 这是我们安装的框架
from celery import Celery

# 通过Celery 提供一个celery 应用
# 存储任务和结果的路径
broker = redis://127.0.0.1/14   # 任务仓库
backend = redis://127.0.0.1/15  # 结果仓库
# 包含的任务
include = [celery_task.task1,celery_task.task2]  # 任务们 函数们(需求)
# 应用
app = Celery(broker=broker, backend=backend, include=include)


# 准备去开启任务 文件task1.2 去提供任务  >>> worker 执行

# 如何启动 简单版在文件"添加celery任务"

  2) 启动celery(app) 服务

  windows :celery  worker -A  celery_task  -1 info  -p  eventlet

  如果执行结果报错时

  # pip3 install eventlet  

  非wind ows  ; celery -A   cerler_task -  1  info 

  3)


  添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本

 

  获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本

  celery 的使用 

  测试结果 启动任务的文件 执行脚本

  (1)立即添加任务

from celery_task import task1,task2

# 调用celery框架的方法 完成任务的添加

# 手动添加立即任务 调用delay就相当于将add 交给celery进行调用
res = task1.add.delay(100,150)
print(res)


res1 = task2.low.delay(200,100)
print(res1)

 

  (2)手动添加延时任务

# 手动添加延任务

from datetime import datetime, timedelta
def eta_second(second):
    ctime = datetime.now()
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    time_delay = timedelta(seconds=second)
    return utc_ctime + time_delay

res = task2.low.apply_async(args=(200, 50), eta=eta_second(10))  # 延迟十秒后执行
print(res)

 

 

技术图片

 

 

 

 result。py 获取执行任务的结果

from celery_task.celery import app

from celery.result import AsyncResult

id = db733c22-a5fd-43e6-8f31-9dec0acf5f64
if __name__ == __main__:
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print(任务失败)
    elif async.status == PENDING:
        print(任务等待中被执行)
    elif async.status == RETRY:
        print(任务异常后正在重试)
    elif async.status == STARTED:
        print(任务已经开始被执行)

   去redis 中获取task_id 即可获取执行的结果 结果不会执行的传给谁 如果需要我们取就行了

  (3)高级使用

  task 任务

  

# 执行的 任务集合
from . celery import app  # 同一级别的 用点 去获取另一个名称空间的值


@app.task
def jump(a,b):
    print(a,b)
    return 两数乘:%s  % (a*b)


# @app.task
# def low(c, d):
#     print(c, d)
#     return c-d

  celery 添加任务

from celery import Celery

broker = redis://127.0.0.1:6379/8
backend = redis://127.0.0.1:6379/10
app = Celery(broker=broker, backend=backend, include=[celery_task.tasks])


# 时区
app.conf.timezone = Asia/Shanghai
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置 添加执行的任务
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    jump-task: {
        task: celery_task.tasks.jump,
        schedule: timedelta(seconds=3),  # 每三秒执行一次
        # ‘schedule‘: crontab(hour=8, day_of_week=1),  # 每周一早八点
        args: (300, 150),
    }
}

  执行的命令

  worker 的指令  socket  任务先开起

   

# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet

  beat 的服务socket 后启动指令

  命令:celery beat -A celery_task -l info

  ctr+c 停止服务

  (4) 定时任务和场景

  ********celery 三种任务的使用场景 - 重点 

  url:发送短信的数据/email/-发送者的地址 与 发送消息(发送时间)

       视图类:EamilAPIView - post

  psot逻辑:

    1)交给celery异步立即执行:拿到celery_task 包中的任务 , 调通delay就可以将任务交给celery异步执行

    2)定时发送短息(延时指定某个时间点发送);拿到celery_task包中的任务 ,根据发送时间去顶延时执行

      3)定时任务:定时爬虫、 定时更新(接口缓存更新)

 

 

  (5)celery 再接口中缓存

from celery import Celery

broker = redis://127.0.0.1:6379/8
backend = redis://127.0.0.1:6379/10
app = Celery(broker=broker, backend=backend, include=[celery_task.tasks])


# 时区
app.conf.timezone = Asia/Shanghai
# 是否使用UTC
app.conf.enable_utc = False
from celery import Celery

broker = redis://127.0.0.1:6379/1
backend = redis://127.0.0.1:6379/2
app = Celery(broker=broker, backend=backend, include=[celery_task.tasks])


# 时区
app.conf.timezone = Asia/Shanghai
# 是否使用UTC
app.conf.enable_utc = False

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    django-task: {
        task: celery_task.tasks.update_banner_list,
        schedule: timedelta(seconds=3),
        args: (),
    }
}

 

  tasks

# 加载页面每十秒更新一次 图片
from .celery import app
from home.models import Banner  # 页面

from settings.const import BANNER_COUNT  # 论播图的最大展示条数
from django.core.cache import cache  # 加载到缓存
from home.serializers import BannerModelSerializer

@app.task
def update_banner_list():
    # 获取最新内容
    banner_query = Banner.objects.filter(is_delete=False,is_show=True)
    # 序列化
    banner_data = BannerModelSerializer(banner_query,many=True).data

    for banner in banner_data:
        banner[image] = http://127.0.0.1:8000 + banner[image]
    # 更新缓存
    cache.set(banner_list,banner_data)
    return True

  注意点:(1)

技术图片

 

   (2)后端启动

技术图片

 

 

  待更新。。。。

  

以上是关于Celery框架的主要内容,如果未能解决你的问题,请参考以下文章

Celery框架

Celery异步任务框架

Celery框架的基本使用与介绍

Celery框架

Celery框架

celery异步执行任务框架