如何使用 celery 和 Django 以编程方式生成 celerybeat 条目

Posted

技术标签:

【中文标题】如何使用 celery 和 Django 以编程方式生成 celerybeat 条目【英文标题】:How to programmatically generate celerybeat entries with celery and Django 【发布时间】:2012-03-21 22:50:44 【问题描述】:

我希望能够以编程方式生成 celerybeat 条目并在添加条目时重新同步 celerybeat。文档here 状态

默认情况下,条目取自 CELERYBEAT_SCHEDULE 设置,但也可以使用自定义存储,例如将条目存储在 SQL 数据库中。

所以我想弄清楚我需要扩展哪些类才能做到这一点。

我一直在查看celery scheduler docs 和djcelery api docs,但是关于其中一些方法的作用的文档不存在,因此我将深入研究一些来源,只是希望有人能指出我正确的方向。

我想我正在做的高水平工作会有所帮助...作为用户,我需要能够从一组预定义的任务中进行选择,并为用户提供一种选择某种自定义计划的方式让它执行,比如每天/每周/每月以及什么日期和时间。

这也是 Django 中的 djcelery。

更新

我看到了 djcelery 管理员的代码,但不清楚这些数据是如何被持久化的。我目前有一个通用的addTask 视图,看起来像这样:

def addTask(request):

intervalSchedule = IntervalSchedule.from_schedule(schedule(timedelta(seconds=10)))
intervalSchedule.save()
modelData = dict(
    name="dcTestPersist",
    task="technologytrackerapi.views.createRecord",
    schedule=intervalSchedule,
)
periodicTask = PeriodicTask(**modelData)
periodicTask.save()
return render_to_response('taskView.html')

数据库中的数据看起来是正确的,但是当守护程序运行时出现以下错误:

[2012-03-06 00:23:07,926:警告/节拍] 过程节拍: [2012-03-06 00:23:07,926: WARNING/Beat] Traceback(最近一次通话最后): [2012-03-06 00:23:07,926: WARNING/Beat] 文件“/usr/lib/python2.7/multiprocessing/process.py”,第 258 行,在 _bootstrap [2012-03-06 00:23:07,926: 警告/节拍] self.run() [2012-03-06 00:23:07,927:警告/节拍] 文件“/home/dchesterman/Documents/PythonDev/.virtualenvs/ros/local/lib/python2.7/site-packages/celery/beat.py” ,第 464 行,正在运行 [2012-03-06 00:23:07,927: 警告/节拍] self.service.start(embedded_process=True) [2012-03-06 00:23:07,927:警告/节拍] 文件“/home/dchesterman/Documents/PythonDev /.virtualenvs/ros/local/lib/python2.7/site-packages/celery/beat.py” ,第 403 行,开始 [2012-03-06 00:23:07,927: 警告/节拍] 间隔 = self.scheduler.tick() [2012-03-06 00:23:07,927:警告/节拍] 文件“/home/dchesterman/Documents/PythonDev/.virtualenvs/ros/local/lib/python2.7/site-packages/celery/beat.py” ,第 194 行,在刻度中 [2012-03-06 00:23:07,927: 警告/节拍] next_time_to_run = self.maybe_due(entry, self.publisher) [2012-03-06 00:23:07,927:警告/节拍] 文件“/home/dchesterman/Documents/PythonDev/.virtualenvs/ros/local/lib/python2.7/site-packages/celery/beat.py” ,第 170 行,在 may_due [2012-03-06 00:23:07,927: 警告/节拍] is_due, next_time_to_run = entry.is_due() [2012-03-06 00:23:07,928:警告/节拍] 文件“/home/dchesterman/Documents/PythonDev/.virtualenvs/ros/local/lib/python2.7/site-packages/djcelery/schedulers.py” ,第 54 行,在 is_due [2012-03-06 00:23:07,928: 警告/节拍] 返回 self.schedule.is_due(self.last_run_at) [2012-03-06 00:23:07,928: WARNING/Beat] AttributeError: 'NoneType' 对象没有属性 'is_due'

我不确定为什么我的日程安排不使用默认的is_due()

【问题讨论】:

【参考方案1】:

这就是最终为我工作的东西:

def addTask(request):

  intervalSchedule = IntervalSchedule.from_schedule(schedule(timedelta(seconds=10)))
  intervalSchedule.save()

  modelData = dict(
      name="dcTestPersist",
      task="technologytrackerapi.tasks.createRecord",
      interval_id=intervalSchedule.pk,
  )

  periodicTask = PeriodicTask(**modelData)
  periodicTask.save()

  me = ModelEntry(periodicTask)

  try:
      me.save()

  except:
    from django.db import connection
    print connection.queries
    raise

  return render_to_response('taskView.html')

我必须将定期任务包装在 ModelEntry 中。

【讨论】:

出于某种原因,我不明白我按照这个创建了我的计划任务并且不需要用ModelEntry 包装。不知道为什么或ModelEntry 是什么。随机注释 - 我也在设置 expires 并且很困惑它没有效果。它确实有效果,但在celeryd 而不是celerybeat - celerybeat 继续无限期触发,但在expires celeryd 之后会忽略它(状态revoked)。跨度> 它是 djcelery 的一部分,它是与 django 的集成。不确定你是否正在使用它。 github.com/celery/django-celery我会怀疑。看起来已弃用。这是 4 岁。 @Dustin sorry 与其说是要求,不如说是为之后可能出现的其他人做笔记。您的答案是在互联网上以编程方式为 CeleryBeat 安排 PeriodicTask 的最简单示例!其余的使用配置文件或只给出部分答案。 (我正在使用 djcelery - 我认为这就是数据库后端的来源,我 from djcelery.models import *。) ..虽然很抱歉应该澄清 PeriodicTask 在 celery 和 djcelery 中,正如你所暗示的那样 - 我使用 djcelery 版本。【参考方案2】:

我认为您想要做的是将PeriodicTasks 添加到数据库中。看起来https://github.com/ask/django-celery/blob/master/djcelery/admin.py 的底部部分是他们在管理中添加任务的方式——你需要在前端提供类似的东西。

【讨论】:

以上是关于如何使用 celery 和 Django 以编程方式生成 celerybeat 条目的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Django 中使用 Celery 上传和处理大型 excel 文件?

如何在 django 中将 api 放入 celery 任务中?

django celery的分布式异步之路 hello world

检查 celery beat 是不是启动并运行

如何使用 Django 配置 Celery 守护进程

如何在 Heroku 上使用 Channels 和 Celery 部署 Django?