django+redis+celery(beat)发布定时任务

Posted 月月coding

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了django+redis+celery(beat)发布定时任务相关的知识,希望对你有一定的参考价值。

一顿操作各种无脑安装:

# 环境: Ubuntu 18.04.1 LTS x86_64 django2.0   Python 3.6.9 (default, Apr 18 2020, 01:56:04) 
# 安装celery
pip3  install celery==4.4.6
pip3  install django_celery_beat   # dj和celery结合,进行定时任务的执行
若出现ERROR: django-timezone-field 4.0 has requirement django>=2.2, but youll have django 2.0 which is incompatible.
    pip3 install django_celery_beat==1.6.0  # 可以安装1.6.0版本,以上报错问题解决

# 按道理讲,pip3安装好celery后就应该可以直接在终端celery了,但是终端可能会说没有这个命令:
sudo apt install python-celery-common  # 解决

# ubuntu安装redis
sudo apt install redis-server #  Redis version=4.0.9

pip3 install redis==3.4.1

 

 

创建django项目:

django-admin startproject mypro

创建mypro的app:

cd mypro
python3 manage.py startapp app01

目录结构如下:

.
└── mypro
    ├── app01
    │   ├── admin.py
    │   ├── apps.py
    │   ├── __init__.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── models.py
    │   ├── tests.py
    │   └── views.py
    ├── manage.py
    └── mypro
        ├── __init__.py
        ├── __pycache__
        │   ├── __init__.cpython-36.pyc
        │   └── settings.cpython-36.pyc
        ├── settings.py
        ├── urls.py
        └── wsgi.py

 

根据路径,修改文件内容如下:

# mypro/mypro/__init__.py

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

# __all__ = (‘celery_app‘,)
__all__ = [celery_app,]

其中的  from __future__ import absolute_import  是为了确保绝对路径导入的.

创建文件celery.py(本项目中以下代码只需要修改标出的两处即可,如果有其他需求请查看官方文档,其他位置代码同理.):

# mypro/mypro/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
 
# set the default Django settings module for the ‘celery‘ program.
os.environ.setdefault(DJANGO_SETTINGS_MODULE, mypro.settings) # 修改这一行将mypro换成你的项目名称
 
app = Celery(mypro) # 修改这一行将mypro换成你的项目名称
 
# Using a string here means the worker don‘t have to serialize
# the configuration object to child processes.
# - namespace=‘CELERY‘ means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object(django.conf:settings, namespace=CELERY)
 
# Load task modules from all registered Django app configs.
# 自动识别每个app中的celery
app.autodiscover_tasks()
 
 
@app.task(bind=True)
def debug_task(self):
    print(Request: {0!r}.format(self.request))

 

 修改settings.py:

# mypro/mypro/settings.py

# 以下为需要去修改的部分:
# 注:和celery相关的设置都必须要以"CELERY_"开头,并大写.
from datetime import timedelta from celery.schedules import crontab DEBUG = False ALLOWED_HOSTS = ["*"] # 强烈不推荐这样设置生产环境,会更容易成为攻击目标. INSTALLED_APPS = [ ... app01, django_celery_beat ] # fro 本地celery 此时你应该装好redis了. CELERY_BROKER_URL=redis://127.0.0.1:6379/0 # 最后的0是指redis的第0个数据库.redis默认有16个数据库. CELERY_RESULT_BACKEND=redis://127.0.0.1:6379/0 # fro 本地celery end # celery beat相关 CELERY_BEAT_SCHEDULE = { "test1": { # "task": "mypro.app01.tasks.mul", #执行的函数.这里是绝对引入的方式,INSTALLED_APPS中注册app01时要这样注册:"mypro.app01",不过试了一下这样不行,worker启动报错. "task": "app01.tasks.mul", #执行的函数,这里要注意引入方式,要和tasks.py文件的归属app在INSTALLED_APPS中注册app的引入方式一致,这里使用的是相对引入. "schedule": timedelta(seconds=5), # every minute 每分钟执行 "args": (2,3) # # 任务函数参数 }, "test2": { "task": "app01.tasks.add", # "schedule": crontab(minute=0, hour="*/1"), # every minute 每小时执行 "schedule": crontab(minute="*/1"), # every minute 每小时执行 "args": (23,43) }, } # celery beat相关end

执行一下:

# 在项目目录mypro/下执行该命令
python3 manage.py migrate

 

 创建文件task.py:

# mypro/app01/tasks.py
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task # 这个shared_task可以让你的任务在任何app中被调用.


@shared_task
def add(x, y):
    return x + y
 
 
@shared_task
def mul(x, y):
    return x * y

 最后的项目目录结构(__pycache__目录及其拥有的文件都删掉了.不用管.):

.
└── mypro
    ├── app01
    │   ├── admin.py
    │   ├── apps.py
    │   ├── __init__.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── models.py
    │   ├── tasks.py
    │   ├── tests.py
    │   └── views.py
    ├── db.sqlite3
    ├── manage.py
    └── mypro
        ├── celery.py
        ├── __init__.py
        ├── settings.py
        ├── urls.py
        └── wsgi.py

 

 此时,django项目mypro已经具备定时任务功能,可以将django和celery的功能结合起来使用了.

结合使用比如:在django中进行ORM查询,将结果以email的方式定时发送给用户;用户定时抢购商品,发送通知;这样的场景下需要django和celery都开启.

以上代码构建完,可以只使用celery的功能进行测试了,因为没有用到django的调用,所以本文档中构建的项目不用启动django.本项目中只有加法和乘法两个task在celery运行(若需要配合django运行,请在app下的tasks.py下写你的task,然后在mypro/settings.py的CELERY_BEAT_SCHEDULE中添加任务.):

打开两个终端,一个生产者一个消费者:

在项目目录mypro/下终端输入:

celery -A mypro beat -l info -S django
celery -A mypro worker -l info

 运行正常,可以结合django深入搞一搞了.

 

 参考官方文档(注意celery的版本,本文中使用的celery版本为4.4.6):

Using Celery with Django

https://docs.celeryproject.org/en/latest/django/first-steps-with-django.html?highlight=celery.py#using-the-shared-task-decorator

拓展  守护进程:Daemonization

https://docs.celeryproject.org/en/latest/userguide/daemonizing.html#daemonization

(说到守护,不得不提到supervisor,可自行查阅相关信息)


若仅仅是为了将他们结合起来,并让他们愉快地跑起来,那么下面的也许就不用看了.

思路拓展:

前端异步生成内容:前端用户点击生成查询一组信息,后端异步执行任务(使用redis存储).后端给前端ajax发送一个类似"39xxxxxa-8xx5-xxxx-xxxx-exxxxxxxxxxf"代码,前端ajax收到代码,设置定时器,向后端请求代码对应的结果,后端有了结果以后等待前端ajax的请求,收到请求后,后端马上将代码对应的信息返回给前端.这样前端就不必等待太久时间卡在原地,不影响用户体验.

 

__pycache__

以上是关于django+redis+celery(beat)发布定时任务的主要内容,如果未能解决你的问题,请参考以下文章

在使用 django_celery_beat 设置的 Django 视图中使用 Celery 定期任务输出,并使用 Redis 设置缓存

python测试开发django-160.Celery 定时任务 (beat)

django定时器_djcelery+mq的使用

celery beat 没有发送消息(使用 django-celery-beat)

django_celery_beat

django-celery-beat 垃圾邮件到期任务