django+redis+celery(beat)发布定时任务
Posted 月月coding
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了django+redis+celery(beat)发布定时任务相关的知识,希望对你有一定的参考价值。
一顿操作各种无脑安装:
# 环境: Ubuntu 18.04.1 LTS x86_64 django2.0 Python 3.6.9 (default, Apr 18 2020, 01:56:04) # 安装celery pip3 install celery==4.4.6 pip3 install django_celery_beat # dj和celery结合,进行定时任务的执行 若出现ERROR: django-timezone-field 4.0 has requirement django>=2.2, but you‘ll have django 2.0 which is incompatible. pip3 install django_celery_beat==1.6.0 # 可以安装1.6.0版本,以上报错问题解决 # 按道理讲,pip3安装好celery后就应该可以直接在终端celery了,但是终端可能会说没有这个命令: sudo apt install python-celery-common # 解决 # ubuntu安装redis sudo apt install redis-server # Redis version=4.0.9 pip3 install redis==3.4.1
创建django项目:
django-admin startproject mypro
创建mypro的app:
cd mypro
python3 manage.py startapp app01
目录结构如下:
. └── mypro ├── app01 │ ├── admin.py │ ├── apps.py │ ├── __init__.py │ ├── migrations │ │ └── __init__.py │ ├── models.py │ ├── tests.py │ └── views.py ├── manage.py └── mypro ├── __init__.py ├── __pycache__ │ ├── __init__.cpython-36.pyc │ └── settings.cpython-36.pyc ├── settings.py ├── urls.py └── wsgi.py
根据路径,修改文件内容如下:
# mypro/mypro/__init__.py from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app # __all__ = (‘celery_app‘,) __all__ = [‘celery_app‘,]
其中的 from __future__ import absolute_import 是为了确保绝对路径导入的.
创建文件celery.py(本项目中以下代码只需要修改标出的两处即可,如果有其他需求请查看官方文档,其他位置代码同理.):
# mypro/mypro/celery.py from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the ‘celery‘ program. os.environ.setdefault(‘DJANGO_SETTINGS_MODULE‘, ‘mypro.settings‘) # 修改这一行将mypro换成你的项目名称 app = Celery(‘mypro‘) # 修改这一行将mypro换成你的项目名称 # Using a string here means the worker don‘t have to serialize # the configuration object to child processes. # - namespace=‘CELERY‘ means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object(‘django.conf:settings‘, namespace=‘CELERY‘) # Load task modules from all registered Django app configs. # 自动识别每个app中的celery app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print(‘Request: {0!r}‘.format(self.request))
修改settings.py:
# mypro/mypro/settings.py # 以下为需要去修改的部分:
# 注:和celery相关的设置都必须要以"CELERY_"开头,并大写.
from datetime import timedelta from celery.schedules import crontab DEBUG = False ALLOWED_HOSTS = ["*"] # 强烈不推荐这样设置生产环境,会更容易成为攻击目标. INSTALLED_APPS = [ ... ‘app01‘, ‘django_celery_beat‘ ] # fro 本地celery 此时你应该装好redis了. CELERY_BROKER_URL=‘redis://127.0.0.1:6379/0‘ # 最后的0是指redis的第0个数据库.redis默认有16个数据库. CELERY_RESULT_BACKEND=‘redis://127.0.0.1:6379/0‘ # fro 本地celery end # celery beat相关 CELERY_BEAT_SCHEDULE = { "test1": { # "task": "mypro.app01.tasks.mul", #执行的函数.这里是绝对引入的方式,在INSTALLED_APPS中注册app01时要这样注册:"mypro.app01",不过试了一下这样不行,worker启动报错. "task": "app01.tasks.mul", #执行的函数,这里要注意引入方式,要和tasks.py文件的归属app在INSTALLED_APPS中注册app的引入方式一致,这里使用的是相对引入. "schedule": timedelta(seconds=5), # every minute 每分钟执行 "args": (2,3) # # 任务函数参数 }, "test2": { "task": "app01.tasks.add", # "schedule": crontab(minute=0, hour="*/1"), # every minute 每小时执行 "schedule": crontab(minute="*/1"), # every minute 每小时执行 "args": (23,43) }, } # celery beat相关end
执行一下:
# 在项目目录mypro/下执行该命令 python3 manage.py migrate
创建文件task.py:
# mypro/app01/tasks.py # Create your tasks here from __future__ import absolute_import, unicode_literals from celery import shared_task # 这个shared_task可以让你的任务在任何app中被调用. @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y
最后的项目目录结构(__pycache__目录及其拥有的文件都删掉了.不用管.):
.
└── mypro
├── app01
│ ├── admin.py
│ ├── apps.py
│ ├── __init__.py
│ ├── migrations
│ │ └── __init__.py
│ ├── models.py
│ ├── tasks.py
│ ├── tests.py
│ └── views.py
├── db.sqlite3
├── manage.py
└── mypro
├── celery.py
├── __init__.py
├── settings.py
├── urls.py
└── wsgi.py
此时,django项目mypro已经具备定时任务功能,可以将django和celery的功能结合起来使用了.
结合使用比如:在django中进行ORM查询,将结果以email的方式定时发送给用户;用户定时抢购商品,发送通知;这样的场景下需要django和celery都开启.
以上代码构建完,可以只使用celery的功能进行测试了,因为没有用到django的调用,所以本文档中构建的项目不用启动django.本项目中只有加法和乘法两个task在celery运行(若需要配合django运行,请在app下的tasks.py下写你的task,然后在mypro/settings.py的CELERY_BEAT_SCHEDULE中添加任务.):
打开两个终端,一个生产者一个消费者:
在项目目录mypro/下终端输入:
celery -A mypro beat -l info -S django celery -A mypro worker -l info
运行正常,可以结合django深入搞一搞了.
参考官方文档(注意celery的版本,本文中使用的celery版本为4.4.6):
Using Celery with Django
https://docs.celeryproject.org/en/latest/django/first-steps-with-django.html?highlight=celery.py#using-the-shared-task-decorator
拓展 守护进程:Daemonization
https://docs.celeryproject.org/en/latest/userguide/daemonizing.html#daemonization
(说到守护,不得不提到supervisor,可自行查阅相关信息)
若仅仅是为了将他们结合起来,并让他们愉快地跑起来,那么下面的也许就不用看了.
思路拓展:
前端异步生成内容:前端用户点击生成查询一组信息,后端异步执行任务(使用redis存储).后端给前端ajax发送一个类似"39xxxxxa-8xx5-xxxx-xxxx-exxxxxxxxxxf"代码,前端ajax收到代码,设置定时器,向后端请求代码对应的结果,后端有了结果以后等待前端ajax的请求,收到请求后,后端马上将代码对应的信息返回给前端.这样前端就不必等待太久时间卡在原地,不影响用户体验.
__pycache__
以上是关于django+redis+celery(beat)发布定时任务的主要内容,如果未能解决你的问题,请参考以下文章
在使用 django_celery_beat 设置的 Django 视图中使用 Celery 定期任务输出,并使用 Redis 设置缓存
python测试开发django-160.Celery 定时任务 (beat)