Django 模型和 Celery 周期性任务

Posted

技术标签:

【中文标题】Django 模型和 Celery 周期性任务【英文标题】:Django models and Celery periodic tasks 【发布时间】:2019-02-16 15:12:44 【问题描述】:

我正在使用 Django 进行物联网项目。我不喜欢做繁琐的编码。这里的问题是我有一个这样的模型名称:

class Period(models.Model):
      number = models.PositiveIntegerField(primary_key=True)
      start_time = models.TimeField()
      end_time = models.TimeField()

此外,我希望我的 Celery beat 在 Period.end_time 做一些事情,我添加了这段代码。 mysite/app/tasks.py 中的代码。

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    all_periods = Period.objects.all()
    for period in all_periods:
        hour = period.end_time.hour
        minute = period.end_time.minute
        sender.add_periodic_task(
            crontab(hour=hour, minute=minute),
            do_some_thing.s()
        )
@task
def do_some_thing():
      #do_some_thing

这是其他文件:

#mysite/mysite/celery.py
from __future__ import absolute_import
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'smartschool.settings')
app = Celery('smartschool')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: 0!r'.format(self.request))

#mysite/mysite/__init__.py
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__=['celery_app']

#mysite/mysite/settings.py ##Celery part.
CELERY_BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_ACCEPT_COTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Ho_Chi_Minh'
CELERY_IMPORT = ('timetable.tasks')
CELERY_BEAT_SCHEDULE = 
    #'test':
    #
    #    'task': 'timetable.tasks.hello',
    #    'schedule': 10.0,
    #,
    'system_on':
    
        'task': 'timetable.tasks.system_on',
        'schedule': crontab(hour=7, minute=0)
    ,
    'system_off':
    
        'task': 'timetable.tasks.system_off',
        'schedule': crontab(hour=17, minute=30)
    ,

添加到 CELERY_SHEDULE_BEAT 的周期性任务运行良好,但 add_periodic_task 函数添加的任务没有。英语不是我的母语;请原谅我帖子中的任何错误。

【问题讨论】:

我认为您的任务定义有误,请查看此文档:docs.celeryproject.org/en/latest/userguide/… 哎呀,我忘了!让我编辑帖子 对不起,我的健忘,我在项目中的代码通过了这些参数。 【参考方案1】:

您可以在您的 crontab 任务中使用 @periodic_task 装饰器,并且在您的项目运行后您应该运行此代码。 celery -A YOURPROJETNAME worker -l -b info

【讨论】:

如果我想在循环中运行 add_periodic_task 以根据 Period 实例自动添加周期性任务,我应该将 @periodic_task 装饰器放在哪里?顺便说一句,我使用的是 Celery 4.2.1。 为您的 setup_periodic_tasks 函数添加【参考方案2】:

这也是运行 crontab 作业的另一种方式。

@task
def my_task():
   //your code 

还有你的celery.py 文件

app.conf.beat_schedule = 
    'my_task': 
        'task': 'Path.tasks.my_task',
        'schedule': crontab(minute='*/5'),
        'args': []
    ,

【讨论】:

【参考方案3】:

我想我已经想通了。通过将@task.on_after_configure.connect 更改为@task.on_after_finalize.connect,add_periodic_task 函数可以工作,但是在我有 11 个 Period 实例时只添加了一个任务!

【讨论】:

以上是关于Django 模型和 Celery 周期性任务的主要内容,如果未能解决你的问题,请参考以下文章

Django + Celery:如何将带有参数的任务链接到周期性任务

在 celery 3.1 中,制作 django 周期性任务

芹菜节拍不接周期性任务

markdown Django的Celery 4周期性任务

Celery异步任务队列/周期任务+ RabbitMQ + Django

Django celery beat 在 Elastic Beanstalk 上看不到周期性任务