celery介绍

Posted Yietong309

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了celery介绍相关的知识,希望对你有一定的参考价值。

目录

官方

架构: 

 celery 是独立的服务

celery包结构

 celery执行异步任务,延时任务,定时任务

异步任务

 延时任务

 定时任务

 django 使用celery

秒杀逻辑

双写一致性


官方

Celery 官网:Celery - Distributed Task Queue — Celery 5.2.7 documentation

Celery 官方文档英文版:Celery - Distributed Task Queue — Celery 5.3.0b1 documentation

Celery 官方文档中文版:Celery - 分布式任务队列 — Celery 3.1.7 文档

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统

1. 完成异步任务:可以提高项目的并发量,以前用多线程实现项目的并发量,现在可以使用celery来做

2. 完成延时任务

3. 完成定时任务

架构: 

消息中间件:  broker 提交的任务【函数】都放在这里, celery本身不能提供消息中间件,需要借助于第三方: redis, rabbitmq

任务执行单元: worker,是真正执行任务的的地方, 一个个进程中执行函数

结果存储: backend, 函数return的结果都存储在这里, celery本身不提供结果存储,需要借助于第三方: redis 数据库, rabbitmq

 celery 是独立的服务

1, 可以不依赖任何服务器,通过自身命令来启动服务

2, celery服务为为其他项目服务提供一部解决任务的需求’  ## 会有两个服务同时运行,一个是项目服务,一个是celery服务, 项目服务讲需要异步处理的任务交给celery服务 , celery就会在需要时一步完成项目的需求

安装

psi3 install celery

使用步骤

1, 写一个main.py: 实例化得到app对象, 写函数,任务,注册成celery的任务, 

2, 在别的程序中提交任务》》》提交到broker中去  add.delay(3,4)

 执行异步任务

add.apply_asyn()

add.delay()

 main.py

import time

from celery import Celery

backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'

app = Celery('test',backend=backend,broker=broker)
# 以上实例化得到对象

# 写任务
@app.task
def add(a,b):
    time.sleep(3)
    print(a+b)
    return a+b

s1.py


from main import add

print('good evening')
# 执行的时同步任务
res = add(3,4)
print(res)


# 3 执行异步任务
# add.apply_async()

print(add.delay(2, 7))   # 74523b46-68a5-4429-b061-723ccf3f9b82
print(add.delay(7, 5))   # 25e9b218-55ff-4a0a-a41f-76cde3ff8833

s2.py


def outer(func):
    def inner(*args,**kwargs):
        res = func(*args,**kwargs)
        return res
    inner.delay='xxx'
    return inner

@outer
def add():
    print('aaa')

#
# add.name='yietong'
print(add.delay)



3, 启动worker, 从broker中去任务执行,执行完放到backend中

	    win:    
        	celery worker -A main -l info -P eventlet  # 4.x及之前用这个 
            celery -A main worker -l info -P eventlet  # 5.x及之后用这个
        lin,mac: 
            celery worker -A main -l info
        	celery -A main worker -l info

 eventlet模块需要安装  pip3 install eventlet

4, 在backend 中查看任务执行的结果

直接看或者通过代码查看

from main import app
from celery.result import AsyncResult
id = '5f7bbf70-9946-4085-b993-f5b8a8d0bd11'
if __name__ == '__main__':
    res = AsyncResult(id=id,app=app)
    if res.successful():
        result = res.get()
        print(result)   # 12
    elif res.failed():
        print('任务失败')
    elif res.status == 'PENDING':
        print('任务正在等待中')
    elif res.status== 'STATED':
        print('任务已经开始被执行')

停掉worker后

 重启服务后

celery包结构

项目  

 celery_task【包】
            -__init__.py
            -celery.py
            -user_task.py
            -home_task.py
        add_task.py
        get_result.py

写一个celery包,以后在任意项目中需要使用的时候把包copy进去,导入使用即可。

使用步骤;

  •  新建包: celery_task
  • 在包里先新建一个celery.py
  • 初始化app
import celery
from celery import Celery

app = celery.Celery()



backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'
#  一定不要忘了include
app = Celery(__name__,broker=broker,backend=backend,include=['celery_task.home_task','celery_task.user_task'])
  • 在包里新建user_task.py 编写用户相关任务
import time

from .celery import app

@app.task
def send_sms(mobile,code):
    time.sleep(1)
    print('短信发送成功:%s,验证码是%s'%(mobile,code))
    return True
  • 在包里新建home_task.py 编写首页相关任务
import time

from .celery import app

@app.task
def add(a,b):
    time.sleep(3)
    print('计算结果时%s'%(a+b))
    return a+b

  • 其他程序提交任务

  • 启动worker》》》它可以先启动【在提交任务之前】》》》cd 到包所在的目录下
celery -A celery_task worker -l info -P eventlet
  • 查看任务执行的结果

 get_result.py

from celery_task.celery import app
from celery.result import AsyncResult

def get(task_id):
    asy = AsyncResult(id=task_id, app=app)
    if asy.successful():
        res = asy.get()
        print('任务执行结果:', res)
    elif asy.failed():
        print('任务失败')
    elif asy.status == 'PENDING':
        print('任务等待中被执行')
    elif asy.status == 'RETRY':
        print('任务异常后正在重试')
    elif asy.status == 'STARTED':
        print('任务已经开始被执行')

if __name__ == '__main__':
    # 任务id,提交任务时返回的结果
    task_id = 'bb52fd1a-43e6-4c36-852c-9b1c940a1ad7'
    get(task_id)

 celery执行异步任务,延时任务,定时任务

异步任务

task.delay(*args,**kwargs)

 延时任务

task.apply_async(args=[参数,参数],eta=时间对象(utc时间))
"""
参数
    args:任务需要的参数
    countdown:几秒后执行
    retry:任务失败是否重试,默认为True
其他参数:
	eta:时间对象
"""
from celery_task.home_task import add


# 提交一个add的异步任务
#eta 是一个时间任务。 要写一个5秒后的时间对象
from datetime import datetime,timedelta
# 得到10miao后的时间,celery 默认使用utc时间
eta = datetime.utcnow()+timedelta(seconds=10)
res = add.apply_async(args=(200,20),eta=eta)
print(res)  # 9d609f6f-4d08-4b62-999c-a9466d3819e5

 定时任务

  -1 app的配置文件中配置 ,写在celery.py中

           
        	app.conf.beat_schedule = 
                'send_sms_task': 
                    'task': 'celery_task.user_task.send_sms',
                    'schedule': timedelta(seconds=5),
                    # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
                    'args': ('1897334444', '7777'),
                ,
                'add_task': 
                    'task': 'celery_task.home_task.add',
                    'schedule': crontab(hour=12, minute=10, day_of_week=3),  # 每周一早八点
                    'args': (10, 20),
                
            

    -2 启动worker :干活的人
          

 celery -A celery_task worker -l info -P eventlet


        -3 启动beat :提交任务的人
         

   celery -A celery_task beat -l info

# 设置时区
app.conf.timezone ='Asia/Shanghai'
# 是否使用utc时间
app.conf.enable_utc = False

from celery.schedules import crontab
# app的配置信息
app.conf.beat_schedule = 
                'send_sms_task': 
                    'task': 'celery_task.user_task.send_sms',
                    'schedule': timedelta(seconds=5),
                    # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
                    'args': ('18595992917', '7777'),
                ,
                'add_task': 
                    'task': 'celery_task.home_task.add',
                    'schedule': crontab(hour=22, minute=10, day_of_week=3),  # 每周一早八点
                    'args': (10, 20),
                
            

补充:

如果公司只想做定时任务, celery比较大,比较麻烦,一般公司会使用 pip install apscheduler

   # 每隔多长事件 
   import time
    from apscheduler.schedulers.blocking import BlockingScheduler
	# 任务
    def my_job(i):
        print (i)
    sched = BlockingScheduler()
    sched.add_job(my_job, 'interval', seconds=5,values=['学会了'])
    
    ## 按年月日
    import datetime
    from apscheduler.schedulers.blocking import BlockingScheduler
    scheduler = BlockingScheduler()
    def my_job(text):
        print(text)
    # datetime类型(用于精确时间)
    scheduler.add_job(my_job, 'date', run_date=datetime(2022, 4, 25, 17, 30, 5), args=['测试任务'])
    
    ## 按corn
    import datetime
	from apscheduler.schedulers.background import BackgroundScheduler
    def job_func(text):
        print("当前时间:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])

    scheduler = BackgroundScheduler()
    # 在每年 1-3、7-9 月份中的每个星期一、二中的 00:00, 01:00, 02:00 和 03:00 执行 job_func 任务
    scheduler .add_job(job_func, 'cron', month='1-3,7-9',day='0, tue', hour='0-3')
    
    

 django 使用celery

使用步骤:

   1 把写好的包(celery_task)复制到项目路径下
    2 在包内的celery.py 的上面加入代码

        import os
        os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
        import django
        django.setup()


   3 在django的视图类中,导入,提交任务

任务.delay()
任务.apply_async()


    4 启动worker,beat

celery -A celery_task worker -l info -P eventlet


celery -A celery_task beat -l info

celery 实现定时更新缓存

双写一致性

redis双写一致性指的是redis和数据库的数据要同时更新

我们都知道把数据库的数据暂存与redis,之后取数据都去redis中去, 这样就可以减少时间消耗,但是会出现一个问题, 数据库更i轩尼诗,redis没有更新,使用取数据取到的还是原来的值。

首先要了解数据库的数据是什么时候存到redis中的

一般来说, 前端发送Ajax请求,会先从redis中取,如果有值,则直接返回,如果没有值,就从数据库中取值并保存到redis中。

使用根据以上流程有以下解决办法

1. 先更新数据库,在更新缓存

2, 先删除缓存,在更新数据库

3,先更新数据库,在删除缓存(这种比较多)

4,定时更新缓存(隔5分钟更新一次)

定时更新缓存

视图类

class BannerView(GenericViewSet, ListModelMixin):
    queryset = models.Banner.objects.all()
    serializer_class = serializer.BannerSerializer

    def list(self, request, *args, **kwargs):
        banner_list = cache.get('banner_list')
        if banner_list:
            # redis中有值直接返回
            return Response(banner_list)
        else:
            # redis中没有值,获取数据再存入redis
            res = super(BannerView, self).list(request, *args, **kwargs)
            cache.set('banner_list', res.data)
            return res

celery.py

from celery import Celery
import os

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")
import django

django.setup()

broker = 'redis://127.0.0.1:6379/2'
backend = 'redis://127.0.0.1:6379/3'
include = [
    'celery_task.tasks'
]

app = Celery('main', broker=broker, backend=backend, include=include)
from datetime import timedelta

app.conf.beat_schedule = 
    'banner_update': 
        'task': 'celery_task.tasks.banner_update',  # 任务路径
        'schedule': timedelta(seconds=10),  # 定时
        'args': (),  # 任务参数
    

任务

@app.task
def banner_update():
    query_set = models.Banner.objects.all()
    ser = serializer.BannerSerializer(instance=query_set, many=True)
    cache.set('banner_list', ser.data)
    return True

秒杀逻辑

前端使用秒杀按钮, 

事件: 像后端秒杀接口发送请求, 发送完立马起一个定时任务, 每隔5秒,像后端查看一下是否秒杀成功, 如果没成功,定时任务继续执行, 如果成功,清空定时任务。

    handleClick() 
      this.$axios.get(this.$settings.BASE_URL + 'userinfo/seckill/').then(res => 
        if (res.data.code == 100) 
          let task_id = res.data.id
          this.$message(
            message: res.data.msg,
            type: 'error'
          );
          // 起个定时任务,每隔5s向后端查询一下是否秒杀成功
          let t = setInterval(() => 
            this.$axios.get(this.$settings.BASE_URL + 'userinfo/get_result/?id=' + task_id).then(
                res => 
                  if (res.data.code == 100 || res.data.code == 101)   //秒杀结束了,要么成功,要么失败了
                    alert(res.data.msg)
                    // 销毁掉定时任务
                    clearInterval(t)
                   else if (res.data.code == 102) 
                    //什么事都不干
                  
                
            )
          , 5000)


        
      )
    

后端: 

秒杀接口

提交秒杀任务

        def seckill(request):
            # 提交秒杀任务
            res = seckill_task.delay()
            return JsonResponse('code': 100, 'msg': '正在排队', 'id': str(res))

查询是否秒杀成功的接口  【根据用户传入的id,查询任务是否成功】

        def get_result(request):
            task_id = request.GET.get('id')
            res = AsyncResult(id=task_id, app=app)
            if res.successful():
                result = res.get()  # 7
                return JsonResponse('code': 100, 'msg': str(result))
            elif res.failed():
                print('任务失败')
                return JsonResponse('code': 101, 'msg': '秒杀失败')
            elif res.status == 'PENDING':
                print('任务等待中被执行')
                return JsonResponse('code': 102, 'msg': '还在排队')
            

 

双写一致性

接口增加缓存

首页轮播图接口增加缓存, 提高了接口的响应速度,提高并发量

class BannerView(GenericViewSet, CommonListModelMixin):
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    serializer_class = BannerSerializer

    def list(self, request, *args, **kwargs):
        result = cache.get('banner_list')
        if result:  # 缓存里有
            print('走了缓存,速度很快')
            return APIResponse(result=result)
        else:
            # 去数据库拿
            print('走了数据库,速度慢')
            res = super().list(request, *args, **kwargs)
            result = res.data.get('result')  # code:100,msg:成功,result:[,]
            cache.set('banner_list', result)
            return res

加了缓存,如果mysql数据库变了,由于请求的都是缓存的数据,导致MySQL和redis的数据不一致, 这就涉及到了双写一致性的问题

1, 修改MySQL数据库,删除缓存,

2, 修改数据库,修改缓存

3, 定时更新缓存  》》  针对实时性不是很高的接口适合定时更新

给首页轮播图接口加了缓存,出现了双写一致性问题, 使用定时更新来解决双写一致性的问题【会存在不一致的情况,可以忽略】 使用celery定时任务

home_task.py

@app.task
def update_banner():
    # 更新缓存
    # 查询出现在轮播图的数据
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    ser = BannerSerializer(instance=queryset, many=True)
    # ser 中得图片,没有前面地址
    for item in ser.data:
        item['image'] = settings.HOST_URL + item['image']
    cache.set('banner_list', ser.data)
    return True

 celery.py

app.conf.beat_schedule = 
    'update_banner': 
        'task': 'celery_task.home_task.update_banner',
        'schedule': timedelta(seconds=50),
        'args': (),
    

启动django, worker,beat

第一次访问: 查的数据库放入了缓存,以后再访问就走缓存。 一旦MySQL数据改了,缓存可能不一致。 定时更新,保持了一致

补充:

@app.task 与@shared.task的区别

他俩的作用一样, 第一个需要执行app, 第二个直接导入使用, 直接从内存中取出来app对象

Celery框架的基本使用与介绍

Celery介绍、安装、基本使用

一、Celery服务

什么是Celery:

Celery是一个简单、灵活且可靠的,处理消息的分布式系统

  • Celery可以用来做什么:
    • 异步任务
    • 定时任务
    • 延迟任务

Celery的运行原理:

  • 可以不依赖任何服务,通过自身命令,启动服务
  • celery服务为其他项目服务提供异步解决任务需求
# 注:会有两个服务同时运行
    - 项目服务
    - celery服务
    项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
    
    
\'\'\'
人是一个独立运行的服务 | 医院也是一个独立运行的服务
	正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
	人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

\'\'\'

1、celery架构

  • 消息中间件:broker

    • 提交的任务【函数】都放在这里, celery本身不能提供消息中间件
    • 需要借助于第三方: redis或rabbitmq
  • 任务执行单元:worker

    • 真正执行任务的的地方,一个个进程中执行函数
  • 结果储存:backend

    • 函数return的结果都存储在这里, celery本身不提供结果存储
    • 需要借助于第三方: redis或rabbitmq

使用场景:

  • 异步执行:解决耗时任务
  • 延迟执行:解决延迟任务
  • 定时执行:解决周期任务

2、celery快速使用

Celery不支持在windows上直接运行,通过eventlet支持在win上运行

安装:

pip install celery
pip install eventlet  # windows需要安装 

快速使用:

- 1、第一步:创建一个py文件(main.py),用于实例化celery对象,编写需要执行的函数
    # 1、导入模块
    from celery import Celery

    # 2、指定briker,用于存放提交的异步任务
    broker = \'redis://127.0.0.1:6379/1\'
    # 3、指定backend,用于存放函数执行结束的结果
    backend = \'redis://127.0.0.1:6379/2\'
    # 实例化celery对象
    app = Celery(\'test\', broker=broker, backend=backend)


    # 编写一个函数,装饰上celery对象
    @app.task
    def add(a, b):
        import time
        time.sleep(3)
        print(\'add函数执行完成\')
        return a + b
       
- 2、第二步:再次创建一个py文件(run.py),用于将函数提交给celery
    # 1、导入刚才编写的函数
    from main import add

    # 2、将任务提交给broker,函数需要的参数需要传入
    res = add.delay(1, 2)
    # 3、提交后可以获得该任务的ID,可通过ID可以查询任务执行结果
    print(res)  # 0213d2c2-453e-41a8-a171-e31f1f2f4883
    
  
- 3、第三步:使用命令开启worker (也可以提前开启,任务提交后就会直接执行)
	# 启动worker命令,win需要安装eventlet
	# 启动需要进入main.py文件的目录下
	win:
		-4.x之前版本
			celery worker -A main -l info -P eventlet
		-4.x之后
			celery  -A main  worker -l info -P eventlet
 	mac:
		celery  -A main  worker -l info

- 4、第四步:worker会将执行的结果存在之前指定的broker目录下(指定的redis数据库)
	
- 5、第五步:通过代码查看执行结果(创建新的py文件,专门用于查看执行结果)
    # 1、导入celery实例的对象
    from main import app
    # 2、导入该模块用于查看结果
    from celery.result import AsyncResult

    # 3、将提交的任务编号拿过来,用于查询结果
    id = \'0213d2c2-453e-41a8-a171-e31f1f2f4883\'

    # 4、指定该文件为启动文件
    if __name__ == \'__main__\':
        # 实例化对象,将任务的ID和celery实例化对象当作参数传入
        a = AsyncResult(id=id, app=app)
        # 判断执行结果
        if a.successful():  # 执行完了
            result = a.get()
            print(result)
        elif a.failed():
            print(\'任务失败\')
        elif a.status == \'PENDING\':
            print(\'任务等待中被执行\')
        elif a.status == \'RETRY\':
            print(\'任务异常后正在重试\')
        elif a.status == \'STARTED\':
            print(\'任务已经开始被执行\')

3、celer包结构【使用包写一个小游戏】

什么是包结构:通过将celery服务封装成包的形式,放在项目需要使用的时候导入即可

project
    ├── celery_task  	  # celery包
    │   ├── __init__.py  # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py
    │   └── tasks.py   # 所有任务函数
    ├── add_task.py  	 # 添加任务
    └── get_result.py   # 获取结果

创建包:

创建一个包,名为:celery_task

- 1、第一步:在包下创建py文件(名字必须为celery.py)
    # 导入celery模块
    from celery import Celery
    # 导入配置broker和backend
    from .settings import BACKEND, BROKER

    # 实例化celery对象
    app = Celery(\'test\',
                 broker=BROKER, 
                 backend=BACKEND,
                 include=[\'celery_task.order_task\', 
                          \'celery_task.user_task\'])

- 2、第二步:创建settings.py,用于存放配置
    BROKER = \'redis://127.0.0.1:6379/1\'
    BACKEND = \'redis://127.0.0.1:6379/2\'
    
- 3、第三步,创建py文件(task.py),用于存放需要执行的异步任务
    # 导入celery实例对象
    from .celery import app


    # 计算函数
    @app.task()
    def add(a, b):
        print(\'计算结果为:\', a + b)
        return True


    # 模拟发送短信
    @app.task()
    def send_sms(mobile, code):
        print(\'已向手机号:%s 发送短信,验证码为:%s\' % (mobile, code))
        return True
    
- 4、第四步:开启worker
	切换到celery所在的目录下,开启worker命令
	

- 5、第五步:提桥任务:
    # 导入任务
    from celery_bag.celery_task.task import send_sms, add

    # 提交任务
    def add_func(a, b):
        return add.delay(a, b)


    def send_func(mobile, code):
        return send_sms.delay(mobile, code)
    
 - 6、触发任务提交:
    import os
    import redis

    from add_task import add_func, send_func
    from get_result import res_func

    POOL = redis.ConnectionPool(max_connections=100)
    conn = redis.Redis(connection_pool=POOL)

    func_list = 
        \'1\': add_func,
        \'2\': send_func
    

    if __name__ == \'__main__\':
        while True:
            print(\'\'\'
            1、异步计算器
            2、模拟发送短信
            3、查看任务执行状态
            4、开启worker,并查看任务执行结果(需要重启系统)
            \'\'\')
            user_choice = input(\'欢迎来到celery测试系统,请输入您需要执行的功能编号>>>:\').strip()
            while user_choice in func_list.keys():
                if user_choice == \'1\':
                    print(\'已进入异步计算器功能!\')
                    a = \'请输入数字 1 \'
                    b = \'请输入数字 2 \'
                else:
                    print(\'您已进入模拟发送短信功能\')
                    a = \'手机号\'
                    b = \'短信内容 \'
                    args_1 = input(\'请输入%s>>>:\' % a).strip()
                    args_2 = input(\'请输入%s>>>:\' % b).strip()
                    args_1 = int(args_1)
                    args_2 = int(args_2)
                    res = func_list.get(user_choice)(args_1, args_2)
                    input(\'\'\'
                        任务提交完成!
                        请记录本次任务ID:%s
                        任意键返回上一层
                               \'\'\' % res)
                    break
                    if user_choice == \'3\':
                        id = input(\'请输入任务ID>>>:\')
                        res = res_func(id)
                        print(res)
                        continue
                        while user_choice == \'4\':
                            try:
                                os.system(\'CD D:\\djangoProject\\luffy_api\\celery_bag\\celery_task\')
                                os.system(\'celery  -A  celery_task worker -l info -P eventlet\')
                            except Exception as e:
                                print(\'开始失败,出现错误,请重启系统\')
                                print(str(e))
                                break
                                print(\'自动返回上一层\')
                                break
                            else:
                                print(\'编号输入有误,请重写输入\')
                                continue

                                
- 7、第七步:查看任务执行结果:
    # 导入celery实例
    from celery_task.celery import app
    from celery.result import AsyncResult


    def res_func(id):
        id = id
        a = AsyncResult(id=id, app=app)
        if a.successful():  # 执行完了
            result = a.get()
            if result: return \'执行完成\'
        elif a.failed():
            return \'任务失败,失败的原因可能是未开启worker\'
        elif a.status == \'PENDING\':
            return \'任务等待中被执行,当前任务较多或未开启worker\'
        elif a.status == \'RETRY\':
            return \'任务异常后正在重试\'
        elif a.status == \'STARTED\':
            return \'任务已经开始被执行,请稍后查询\'

以上是关于celery介绍的主要内容,如果未能解决你的问题,请参考以下文章

在使用 django_celery_beat 设置的 Django 视图中使用 Celery 定期任务输出,并使用 Redis 设置缓存

python测试开发django-157.celery异步与redis环境搭建

Django中使用Celery实现定时任务(用djcelery)

celery+django+redis使用介绍

Celery+python+redis异步执行定时任务

Python celery和Redis入门安装使用(排难帖)