celery定时任务

Posted tangjian219

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了celery定时任务相关的知识,希望对你有一定的参考价值。

Celery

介绍

celery,处理任务的Python的模块。celery是一个基于Python开发的模块,可以帮助我们对任务进行分发和处理。 ying

应用场景1

对【耗时的任务】,通过celery,将任务添加到broker(队列),然后立即给用户返回一个任务ID。
当任务添加到broker之后,由worker去broker获取任务并处理任务。
任务完成之后,再将结果放到backend中

用户想要检查结果,提供任务ID,就可以去backend中去查找。

应用场景2

定时任务(定时发布/定时拍卖)

环境搭建

pip3 install celery==4.4
安装broker: redis或rabbitMQ
pip3 install redis / pika

快速使用

准备

安装好celery所涉及的模块
创建一个django程序(也可以是普通的py文件,也可以不用django),我以django举例
新建了一个文件夹:celery_examples,在文件夹中分别创建s1.py/s2.py/s3.py   3个py文件

s1.py

from celery import Celery

app = Celery('tasks',broker='redis://127.0.0.1:6379',backend='redis://127.0.0.1:6379')
#broker是存放任务的redis队列   backend是存放任务结果的队列

# 任务函数
@app.task
# 定义了一个函数,被celery的app装饰,就可以被当做celery的任务
def x1(x,y):
    return x+y

@app.task
def x2(x,y):
    return x-y

s2.py

from s1 import x1

# 将任务放入任务队列
result = x1.delay(4,4)
# delay是celery中的一个方法,意思就是把x1函数以及函数所需的参数放到任务队列broker中

print(result.id)
# result.id就是任务id  ec465c86-e4cb-4ca8-a12a-440c3792f998

s3.py

from celery.result import AsyncResult
from celery_examples.s1 import app

# 查询任务
# result = AsyncResult(id="s2.py中的任务id",app=app)
result_object = AsyncResult(id="ec465c86-e4cb-4ca8-a12a-440c3792f998",app=app)

print(result_object.status)
# 查询任务状态  SUCCESS

print(result_object.get())
# 8  获取任务结果

运行程序

1.先启动redis
2.启动worker
# 进入当前目录,就是存放3个py文件的目录中
celery worker -A s1 -l info # s1为带任务函数的文件名   -l info 为输出日志,项目上线时就可以不用写
执行完后,在到s2.py文件中,右键运行s2.py文件,此时再看pycharm的Terminal终端会报错

# windows报的错
Traceback (most recent call last):
  File "d:wupeiqipy_virtual_envsauctionlibsite-packagesilliardpool.py", line 362, in workloop
    result = (True, prepare_result(fun(*args, **kwargs)))
  File "d:wupeiqipy_virtual_envsauctionlibsite-packagesceleryapp	race.py", line 546, in _fast_trace_task
    tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)

# 解决报错
# 终止程序,安装eventlet,再次执行worker
pip install eventlet

celery worker -A s1 -l info -P eventlet
执行完s2.py后会得到一个任务id
将任务id填写到s3.py中,再次运行s3.py,得到任务的结果 SUCCESS

django中应用celery

第一步:【项目/项目/settings.py 】添加配置

CELERY_BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379'
CELERY_TASK_SERIALIZER = 'json'

第二步:【项目/项目/celery.py】在项目同名目录创建 celery.py

import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_test.settings') #为了找到配置文件,读取配置文件

app = Celery('celery_test') # 随意起个名字,建议和项目同名

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
# 去每个已注册app中读取 tasks.py 文件
app.autodiscover_tasks()

第三: 【项目/app名称/tasks.py】

from celery import shared_task

@shared_task
# 把这个函数加上这个装饰器就会把这个函数变成任务
def x1(x, y):
    return x + y

@shared_task
def x2(x, y):
    return x * y

第四步: 【项目/项目/__init__.py

from .celery import app as celery_app

__all__ = ('celery_app',)

第五步: 启动worker

# 先进入项目 celery_test为最外层的项目名
C:python-Djangocelery_test> celery worker -A celery_test -l info -P eventlet

第六步: 编写django的路由

from django.conf.urls import url
from django.contrib import admin
from celery_app import views

urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^create/task/$', views.create_task), #创建任务路由
    url(r'^get/result/$', views.get_result),   #获取任务结果路由
]

第七步: 编写django视图函数

from django.shortcuts import render,HttpResponse
from celery_app import tasks
from celery.result import AsyncResult
from celery_test import celery_app


def create_task(request):
    result = tasks.x1.delay(2,2) #任务 
    return HttpResponse(result.id)


def get_result(request):
    nid = request.GET.get('nid') # nid为任务id,执行create_task会获取,然后拼接到获取结果的路由中
    # from demos.celery import app
    result_object = AsyncResult(id=nid, app=celery_app)
    # print(result_object.status)
    data = result_object.get()
    return HttpResponse(data)

# 启动django
# 访问 http://127.0.0.1:8000/create/task/ 获取任务id
# 访问 http://127.0.0.1:8000/get/result/?nid=e751af7b-2480-4023-b997-943171794d3c   id拼接获取结果

celery定时任务

基于上面的代码(django中应用celery的代码)做简单修改就可以定时执行任务

修改内容是django的视图函数,修改为

import datetime
from django.shortcuts import render,HttpResponse
from celery_app import tasks
from celery.result import AsyncResult
from celery_test import celery_app

# Create your views here.
def create_task(request):
    # 立即执行
    # result = tasks.x1.delay(2,2)

    # 定时执行
    ctime = datetime.datetime.now() #本地时间
    utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp()) #本地时间转化为UTC时间
    target_time = utc_ctime + datetime.timedelta(seconds=10) # 10秒后执行
    result = tasks.x1.apply_async(args=[11,3],eta=target_time) # 任务是x1,参数是args=[11,3],定时时间为target_time
    return HttpResponse(result.id)


def get_result(request):
    nid = request.GET.get('nid')
    # from demos.celery import app
    result_object = AsyncResult(id=nid, app=celery_app)
    # print(result_object.status)
    if result_object.status == 'SUCCESS':
        data = result_object.get() #取完数据之后还会留在结果队列中
        result_object.forget()  #将结果从结果队列中移除
        # result_object.revoke(terminate=True) #强行将任务干掉
        return HttpResponse(data)
    elif result_object.status == 'FAIL': #失败做响应的操作,忘了是FAIL还是其他单词
        pass #响应操作

    

# 再次启动worker ---> celery worker -A celery_test -l info -P eventlet
# 启动django
# 访问 http://127.0.0.1:8000/create/task/ 获取任务id

此时的终端显示

技术图片

以上是关于celery定时任务的主要内容,如果未能解决你的问题,请参考以下文章

Bamboo Django Celery定时任务和时间设置

Celery Beat定时任务

Django-Python3-Celery 异步任务/定时任务

Celery学习--- Celery操作之定时任务

CELERY 定时任务

celery 定时任务