celery-2

Posted weiyiming007

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了celery-2相关的知识,希望对你有一定的参考价值。

1、celery执行定时任务

1)几点几分执行:

celery_task-s1.py

from celery import Celery
import time

# redis不加密码
broker=redis://127.0.0.1:6379/0
backend=redis://127.0.0.1:6379/1

app=Celery(test,backend=backend,broker=broker)

@app.task  # @app名字.task
def add(x,y):
    time.sleep(2)
    return x+y


add_task.py

# 2020/2/21 22:38:30开始执行任务
from datetime import datetime
v1 = datetime(2020, 3, 21, 22, 38, 30)
print(v1)

v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)

# 取出要执行任务的时间对象,调用apply_async方法,args是任务函数的参数,eta是执行的时间
result = celery_task_s1.add.apply_async(args=[1, 3], eta=v2)
print(result.id)


此时启动work和broker,程序会等到指定的那个时间才会执行;


2)多长时间后执行

add_task.py

from datetime import datetime

ctime = datetime.now()

# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta

#取10秒之后的时间对象
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async并设定时间
result = celery_task_s1.add.apply_async(args=[4, 3], eta=task_time)
print(result.id)


此时启动work和broker,程序会等到10秒后才会执行;


2、执行计划任务

类似于contab的定时任务


案例, (用的是多任务机构):

celery.py

from celery import Celery
from datetime import timedelta
from celery.schedules import crontab

backend=redis://127.0.0.1:6379/0
broker=redis://127.0.0.1:6379/1

app=Celery(test,broker=broker, backend=backend,
           # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
           include=[celery_task.order_task,
                    celery_task.user_task
           ])


# 字典里可以写多个不同的任务
app.conf.beat_schedule = {
    # 名字随意命名
    add-every-10-seconds: {
        # 执行order_task下的test_celery函数
        task: celery_task.order_task.order_add,
        # 每隔10秒执行一次
        # ‘schedule‘: 1.0,
        # ‘schedule‘: crontab(minute="*/1"),
        schedule: timedelta(seconds=10),
        # 给任务函数传递参数
        args: (5, 6)
    },
    # ‘add-every-12-seconds‘: {
    #     ‘task‘: ‘celery_task.order_task.order_add‘,
    #     # 每年3月21号,23点15分执行
    #     ‘schedule‘: crontab(minute=15, hour=23, day_of_month=21, month_of_year=3),
    #     ‘args‘: (16, 16)
    # },

}


order_task.py

import time
from celery_task.celery import app

@app.task
def order_add(x, y):
    time.sleep(1)
    return x + y


此时需要先启动一个beat:celery beat -A celery_task -l info

前面都是手动提交任务,这个beat会自动帮我们提交任务,而且周期是我们任务里指定的,我这里是10秒,它就会10秒提交一次;

然后启动work,celery worker -A celery_task -l info -P eventlet    

work就会每隔10秒接收一次任务,并一直循环执行;


3、django中使用celery

直接把多任务结构copy过来,使用即可;

技术图片

celery_task是在项目目录下;


celery.py

from celery import Celery

backend=redis://127.0.0.1:6379/0
broker=redis://127.0.0.1:6379/1

app=Celery(test,broker=broker, backend=backend,
           # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
           include=[celery_task.order_task,
                    celery_task.user_task
           ])


user_task.py

import time
from celery_task.celery import app

@app.task
def user_add(x, y):
    time.sleep(1)
    return x + y


views.py

from django.shortcuts import render, HttpResponse
from celery_task.user_task import user_add
from celery.result import AsyncResult
from celery_task.celery import app

# Create your views here.
def index(request):
    result = user_add.delay(8, 9)
    return HttpResponse(result.id)


def check_result(request):
    res = request.GET.get(id)
    async = AsyncResult(id=res, app=app)
    if async.successful():
        # 取出return值
        result = async.get()
        print(result)
        return HttpResponse(任务执行完成)


urls.py

from django.conf.urls import url
from django.contrib import admin
from app01 import views

urlpatterns = [
    url(r^admin/, admin.site.urls),
    url(r^index/, views.index),
    url(r^check/, views.check_result),
]


需要先启动work: celery worker -A celery_task -l info -P eventlet

然后启动django,在浏览器中访问:

技术图片 

技术图片

先访问了index,index视图函数中执行了任务;

然后拿到任务id,再访问check手动查询结果;


注意:
在celery的任务函数中不能直接调用django的环境,比如要使用ORM就不行,需要手动添加django环境才行;

import os

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "BMS.settings")
    import django     # 加载django
    django.setup()   # 启动django

    from app01 import models  # 这样就能使用ORM了
    ......


4、django-celery

其实除了原生的celery模块,django也提供了一个django-celery模块,但据说不太好用;

如果使用到了再看吧;

以上是关于celery-2的主要内容,如果未能解决你的问题,请参考以下文章

celery:celery介绍架构基本使用,celery执行异步任务延迟任务定时任务,django中使用celery。

django 2.x + celery 4.2.x 配置文件 设置

Celery包结构应用

celery介绍

如何判断任务是不是已经在 django-celery 中排队?

python测试开发django-196.python3.8+django2+celery5.2.7环境准备