Celery 收下这捆芹菜!

Posted bigb

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Celery 收下这捆芹菜!相关的知识,希望对你有一定的参考价值。

Celery简介

Celery是一个强大的 分布式任务队列 的 异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)

Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

Celery构成

Task

任务模块, 包含异步任务和定时任务, 异步任务通常在业务逻辑中被触发并发往Broker任务队列, 定时任务由Celery Beat 进程周期性地发往Broker任务队列

Broker

消息中间件, 就是任务调度队列, 用来接收任务, 将任务存储到队列中, 就像是生产者与消费者模型中的队列一样
Celery本身不提供Broker, 官方推荐使用RabbitMQ和Redis

Worker

任务的执行单元, 它实时监控Broker队列, 获取队列中的任务, 并执行, 可以看做是生产者与消费者模型中的消费者

Backend

任务执行结果的存储单元, 用来存储任务结果, 以便查询
Celery本身不提供Backend, 官方推荐使用RabbitMQ和Redis

技术图片

Celery使用

安装

python安装Celery: pip install celery
我们使用Redis作为Broker和Backend, 因此确保你的设备配置了Redis环境

基本使用

"""
project
    ├── celery_task     # celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须是celery.py
    │   └── tasks.py    # 所有任务函数
    ├── add_task.py     # 添加任务
    └── get_result.py   # 获取结果

"""

异步任务: delay

"""
1.执行add_task.py将任务添加到队列
2.cmd切换至所在文件目录celery_task下运行worker: 
    >: celery worker -A celery_task -l info -P eventlet
3.执行get_result.py获取任务结果

"""
# celery.py
from celery import Celery

# 配置消息中间件, 用来接收任务
broker = 'redis://127.0.0.1:6379/0'

# 配置backend, 用来存储任务执行结果
backend = 'redis://127.0.0.1:6379/1'

# worker, 任务执行单元
app = Celery(broker=broker, backend=backend)
# tasks.py
from .celery import app

# 定义任务
@app.task
def add(x, y):
    res = x + y
    print(f'{x}+{y}={res}')
    return res


@app.task
def minus(x, y):
    res = x - y
    print(f'{x}-{y}={res}')
    return res
# add_task.py
from .tasks import add, minus

# 在业务逻辑中触发异步任务
add_results = add.delay(10, 20)

# 任务执行结果的id
print(add_results.id)
# get_result.py
from .celery import app
from .add_tasks import add_results
from celery.result import AsyncResult

if __name__ == '__main__':
    # 获取异步任务结果对象, 参数:id, app
    async = AsyncResult(id=add_results.id, app=app)
    
    if async.successful():
        result = async.get()
        print('任务执行成功')
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

延迟任务: apply_async

from celery_task.tasks import add
from datetime import timedelta, datetime

# 添加延时任务, 10秒后执行
add_results = add.apply_async(args=(10, 20), eta=datetime.utcnow() + timedelta(seconds=10))

# 任务执行结果的id
print(add_results.id)

周期任务: beat_schedules

# celery.py
from celery import Celery
from datetime import timedelta
from celery.schedules import crontab

# 配置消息中间件, 用来接收任务
broker = 'redis://127.0.0.1:6379/0'

# 配置backend, 用来存储任务执行结果
backend = 'redis://127.0.0.1:6379/1'

# worker, 任务执行单元
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])

# 时区
app.conf.timezone = 'Asia/Shanghai'

app.conf.enable_utc = False

app.conf.beat_schedule = {
    'add-task': {
        'task': 'celery_task.tasks.add',
        # 每10秒添加一次任务
        'schedule': timedelta(seconds=10),
        'args': (10, 20)
    },
    'minus-task': {
        'task': 'celery_task.tasks.minus',
        # 每周一八点半添加一次任务
        'schedule': crontab(hour=8, minute=30, day_of_week=1),
        'args': (20, 10)
    }
    
}

Django配置Celery

在根目录下创建包文件夹 celery_task

"""
project
    ├── celery_task     # celery包
        ├── __init__.py # 包文件
        ├── celery.py   # celery连接和配置相关文件,且名字必须是celery.py
        └── tasks.py    # 所有任务函数   
"""

celery.py

# 1.加载Django配置环境
import os

os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'luffyapi.settings.dev')

# 2.加载Celery配置环境
from celery import Celery

broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks.py'])

# 时区
app.conf.timezone = 'Asia/Shanghai'
# UTC
app.conf.enable_utc = False


from datetime import timedelta
app.conf.beat_schedules = {
    'update-banner-cache': {
        'task': 'celery_task.tasks.py.update_banner_cache',
        # 每10秒添加一次
        'schedule': timedelta(seconds=10),
        'args': ()
    }
}

task.py

from .celery import app
from home.models import Banner
from home.serializers import BannerModerSerializer
from django.conf import settings
from django.core.cache import cache


@app.task
def update_banner_cache():
    print('lalal')
    banner_query = Banner.objects.filter(is_delete=False, is_show=True).order_by('-order').all()
    serializer_obj = BannerModerSerializer(data=banner_query, many=True)
    banner_data = serializer_obj.data
    for banner in banner_data:
        banner['image'] = settings.BASE_URL + banner.get('image')
    cache.set('banner_cache', banner_data)
    return True

以上是关于Celery 收下这捆芹菜!的主要内容,如果未能解决你的问题,请参考以下文章

开始芹菜:AttributeError:“模块”对象没有属性“芹菜”

芹菜自动重载不起作用

芹菜中的导入错误

模块“模块”没有属性“芹菜”

芹菜。运行单个芹菜节拍 + 多个芹菜工人规模

芹菜任务和自定义装饰器