如何将定期任务发送到 Celery 中的特定队列

Posted

技术标签:

【中文标题】如何将定期任务发送到 Celery 中的特定队列【英文标题】:How to send periodic tasks to specific queue in Celery 【发布时间】:2013-06-02 11:34:58 【问题描述】:

默认情况下,Celery 将所有任务发送到“celery”队列,但您可以通过添加额外参数来更改此行为:

@task(queue='celery_periodic')
def recalc_last_hour():
    log.debug('sending new task')
    recalc_hour.delay(datetime(2013, 1, 1, 2)) # for example

调度器设置:

CELERYBEAT_SCHEDULE = 
   'installer_recalc_hour': 
        'task': 'stats.installer.tasks.recalc_last_hour',
        'schedule': 15  # every 15 sec for test
    ,

CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"

运行工人:

python manage.py celery worker -c 1 -Q celery_periodic -B -E

此方案未按预期工作:此工作人员将定期任务发送到“celery”队列,而不是“celery_periodic”。我该如何解决?

附:芹菜==3.0.16

【问题讨论】:

这有帮助吗? docs.celeryproject.org/en/latest/userguide/routing.html 我关闭了 CELERYBEAT_SCHEDULER 选项(使用了基于文件的停止)并且它工作正常。 【参考方案1】:

定期任务通过 celery beat 发送到队列,您可以在其中执行使用 Celery API 执行的所有操作。以下是 celery beat 自带的配置列表:

https://celery.readthedocs.org/en/latest/userguide/periodic-tasks.html#available-fields

在你的情况下:

CELERYBEAT_SCHEDULE = 
   'installer_recalc_hour': 
        'task': 'stats.installer.tasks.recalc_last_hour',
        'schedule': 15,  # every 15 sec for test
        'options': 'queue' : 'celery_periodic',  # options are mapped to apply_async options
    ,

【讨论】:

好吧,两者都以某种方式回答了这个问题......但是,我同意这个答案比接受的答案略好,因为问题作者要求 CELERYBEAT_SCHEDULE 更改......【参考方案2】:

我找到了解决这个问题的方法:

1) 首先我改变了配置周期性任务的方式。我像这样使用 @periodic_task 装饰器:

@periodic_task(run_every=crontab(minute='5'),
               queue='celery_periodic',
               options='queue': 'celery_periodic')
def recalc_last_hour():
    dt = datetime.utcnow()
    prev_hour = datetime(dt.year, dt.month, dt.day, dt.hour) \
                - timedelta(hours=1)
    log.debug('Generating task for hour %s', str(prev_hour))
    recalc_hour.delay(prev_hour)

2) 我在 @periodic_task 的参数中写了两次 celery_periodic

queue='celery_periodic' 选项在您从代码(.delay 或 .apply_async)调用任务时使用

options='queue': 'celery_periodic' 选项在 celery beat 调用时使用。

我敢肯定,如果您使用 CELERYBEAT_SCHEDULE 变量配置周期性任务,同样的事情也是可能的。

UPD。此解决方案适用于 CELERYBEAT_SCHEDULER 的基于数据库和基于文件的存储。

【讨论】:

@periodic_task(如果我理解正确的话)现在已被弃用 您能分享一下如何为 celery v5.1.2 重构相同的代码吗? (因为 period_task 装饰器已被弃用)【参考方案3】:

如果您使用的是 djcelery 数据库调度程序,您可以在执行选项 -> 队列字段中指定队列

【讨论】:

以上是关于如何将定期任务发送到 Celery 中的特定队列的主要内容,如果未能解决你的问题,请参考以下文章

芹菜:在特定时间间隔后执行任务

CELERY 定时任务

如何检查处理 Celery 任务的队列

如何使用 Celery 和 Django 将任务路由到不同的队列

php怎么调用celery任务

浅谈 Celery 分布式队列