Celery Crontab 中的输出日期调度到 Django 模板

Posted

技术标签:

【中文标题】Celery Crontab 中的输出日期调度到 Django 模板【英文标题】:Output Dates in Celery Crontab Schedule to Django Template 【发布时间】:2021-07-15 01:28:54 【问题描述】:

我将 Celery 用于我的 Django 项目,并且我已经安排了一些 crontab 任务以在特定时间向用户发送电子邮件。我需要在 html/Django 模板中输出一个时间表,显示用户可以期待电子邮件发出的日期。我的 Celery 的 crontab 时间表如下所示:

import os
from celery import Celery
from celery.schedules import crontab

app = Celery("myapp")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print("Request: 0!r".format(self.request))

first_monday_april_to_july = crontab(
    minute=15, hour=6, day_of_week="monday", day_of_month="1-7", month_of_year="4,5,6,7"
)
august_every_monday_first_two_weeks = crontab(
    minute=15, hour=6, day_of_week="monday", day_of_month="1-14", month_of_year="8"
)
august_weekdays_second_two_weeks = crontab(
    minute=15, hour=6, day_of_week="mon-fri", day_of_month="15-31", month_of_year="8"
)

app.conf.beat_schedule = 
    "report1": 
        "task": "email_report",
        "schedule": first_monday_april_to_july,
    ,
    "report2": 
        "task": "email_report",
        "schedule": august_every_monday_first_two_weeks,
    ,
    "report3": 
        "task": "email_report",
        "schedule": august_weekdays_second_two_weeks,

我希望能够从这些任务中获取发送电子邮件的所有日期,并将它们保存到视图中的列表中,然后将其发送到模板以显示在表格中。这样的事情可能吗?到目前为止,我还没有找到一种方法来做到这一点。

【问题讨论】:

您的意思是,您需要计算下一个执行日期是什么? meybe this package 可以帮助您。 PeriodicTask 此 django 应用程序中定义的任务模型包含有关定期 celery 任务的信息。您可以将此模型的查询集传递给上下文并在模板中使用。 @VictorErmakov 是的,它需要显示任务的所有执行日期。 我想,你需要得到这个包github.com/kiorky/croniter 【参考方案1】:

我最终创建了一个函数来获取任务中的所有日期,然后将它们全部附加到计划列表中,然后通过视图将其传递给模板以在表格中输出。它看起来像这样:

from datetime import date, datetime, timedelta
from .celery import (
    first_monday_april_to_july,
    august_every_monday_first_two_weeks,
    august_weekdays_second_two_weeks,
)


def get_schedule_due_dates(schedule, last_date=datetime.now().date().replace(month=12, day=31)):
    start_date = datetime.now()
    end_date = datetime.combine(last_date, datetime.max.time())
    schedules = []
    while start_date <= end_date:
        next_due_schedule = schedule.remaining_estimate(start_date)
        due_date = datetime.now() + next_due_schedule
        if due_date <= end_date:
            schedules.append(due_date)
        else:
            break
        start_date = due_date + timedelta(days=1)
    return schedules


def report_schedule(request):
    schedules = []
    schedules.extend(get_schedule_due_dates(first_monday_april_to_july))
    schedules.extend(get_schedule_due_dates(august_every_monday_first_two_weeks))
    schedules.extend(get_schedule_due_dates(august_weekdays_second_two_weeks))
    return render(
        request,
        "reports/report_schedule.html",
        "schedules": sorted(schedules),
    )

【讨论】:

以上是关于Celery Crontab 中的输出日期调度到 Django 模板的主要内容,如果未能解决你的问题,请参考以下文章

Schedule—比 Celery 更轻量级的周期任务调度工具

使用 Flask 动态调度 Celery Beat 任务

python中的轻量级定时任务调度库:schedule

Celery Beat 调度程序不会在 Django 中执行我的任务

python中的轻量级定时任务调度库:schedule

为啥使用 Celery 运行计划任务比使用 crontab 更可取?