如何将定期任务发送到 Celery 中的特定队列
Posted
技术标签:
【中文标题】如何将定期任务发送到 Celery 中的特定队列【英文标题】:How to send periodic tasks to specific queue in Celery 【发布时间】:2013-06-02 11:34:58 【问题描述】:默认情况下,Celery 将所有任务发送到“celery”队列,但您可以通过添加额外参数来更改此行为:
@task(queue='celery_periodic')
def recalc_last_hour():
log.debug('sending new task')
recalc_hour.delay(datetime(2013, 1, 1, 2)) # for example
调度器设置:
CELERYBEAT_SCHEDULE =
'installer_recalc_hour':
'task': 'stats.installer.tasks.recalc_last_hour',
'schedule': 15 # every 15 sec for test
,
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
运行工人:
python manage.py celery worker -c 1 -Q celery_periodic -B -E
此方案未按预期工作:此工作人员将定期任务发送到“celery”队列,而不是“celery_periodic”。我该如何解决?
附:芹菜==3.0.16
【问题讨论】:
这有帮助吗? docs.celeryproject.org/en/latest/userguide/routing.html 我关闭了 CELERYBEAT_SCHEDULER 选项(使用了基于文件的停止)并且它工作正常。 【参考方案1】:定期任务通过 celery beat 发送到队列,您可以在其中执行使用 Celery API 执行的所有操作。以下是 celery beat 自带的配置列表:
https://celery.readthedocs.org/en/latest/userguide/periodic-tasks.html#available-fields
在你的情况下:
CELERYBEAT_SCHEDULE =
'installer_recalc_hour':
'task': 'stats.installer.tasks.recalc_last_hour',
'schedule': 15, # every 15 sec for test
'options': 'queue' : 'celery_periodic', # options are mapped to apply_async options
,
【讨论】:
好吧,两者都以某种方式回答了这个问题......但是,我同意这个答案比接受的答案略好,因为问题作者要求 CELERYBEAT_SCHEDULE 更改......【参考方案2】:我找到了解决这个问题的方法:
1) 首先我改变了配置周期性任务的方式。我像这样使用 @periodic_task 装饰器:
@periodic_task(run_every=crontab(minute='5'),
queue='celery_periodic',
options='queue': 'celery_periodic')
def recalc_last_hour():
dt = datetime.utcnow()
prev_hour = datetime(dt.year, dt.month, dt.day, dt.hour) \
- timedelta(hours=1)
log.debug('Generating task for hour %s', str(prev_hour))
recalc_hour.delay(prev_hour)
2) 我在 @periodic_task 的参数中写了两次 celery_periodic:
queue='celery_periodic' 选项在您从代码(.delay 或 .apply_async)调用任务时使用
options='queue': 'celery_periodic' 选项在 celery beat 调用时使用。
我敢肯定,如果您使用 CELERYBEAT_SCHEDULE 变量配置周期性任务,同样的事情也是可能的。
UPD。此解决方案适用于 CELERYBEAT_SCHEDULER 的基于数据库和基于文件的存储。
【讨论】:
@periodic_task(如果我理解正确的话)现在已被弃用 您能分享一下如何为 celery v5.1.2 重构相同的代码吗? (因为 period_task 装饰器已被弃用)【参考方案3】:如果您使用的是 djcelery 数据库调度程序,您可以在执行选项 -> 队列字段中指定队列
【讨论】:
以上是关于如何将定期任务发送到 Celery 中的特定队列的主要内容,如果未能解决你的问题,请参考以下文章