Celery 4 不自动发现任务

Posted

技术标签:

【中文标题】Celery 4 不自动发现任务【英文标题】:Celery 4 not auto-discovering tasks 【发布时间】:2018-04-13 02:40:42 【问题描述】:

我有一个 Django 1.11 和 Celery 4.1 项目,我已经根据setup docs 进行了配置。我的celery_init.py 看起来像

from __future__ import absolute_import

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ['DJANGO_SETTINGS_MODULE'] = 'myproject.settings.settings'

app = Celery('myproject')

app.config_from_object('django.conf:settings', namespace='CELERY')

#app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) # does nothing
app.autodiscover_tasks() # also does nothing

print('Registering debug task...')
@app.task(bind=True)
def debug_task(self):
    print('Request: 0!r'.format(self.request))

但是,当我启动一个工人时:

.env/bin/celery worker -A myproject -l info

它显示除了示例“debug_task”之外没有找到任何任务,即使我已经安装了几个带有 Celery 任务的应用程序,应该通过调用 app.autodiscover_task() 找到。这是我的工作人员生成的初始输出:

 -------------- celery@localhost v4.1.0 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-4.13.0-16-generic-x86_64-with-Ubuntu-16.04-xenial 2017-10-31 15:56:42
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         myproject:0x7f952856d650
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     amqp://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . myproject.celery_init.debug_task

[2017-10-31 15:56:42,180: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2017-10-31 15:56:42,188: INFO/MainProcess] mingle: searching for neighbors
[2017-10-31 15:56:43,211: INFO/MainProcess] mingle: all alone
[2017-10-31 15:56:43,237: INFO/MainProcess] celery@localhost ready.

我的应用程序 tasks.py 文件中的所有旧任务都定义为:

from celery.task import task

@task(name='mytask')
def mytask():
    blah

文档建议使用 shared_task 装饰器,所以我尝试了:

from celery import shared_task

@shared_task
def mytask():
    blah

但我的 Celery 工人仍然没有看到它。我做错了什么?

编辑:我已经能够通过在我的设置的CELERY_IMPORTS 列表中明确列出任务来显示它们,但即便如此我也必须大量编辑tasks.py 以删除我的 Django 项目的所有导入(模型.py 等)或引发异常 Apps aren't loaded yet. 这总比没有好,但需要大量重构。有没有更好的办法?

【问题讨论】:

运行celery worker -A app.tasks -l DEBUG时会发生什么我发现我需要指定应用的任务文件,而不是整个项目。 【参考方案1】:

我遇到了类似的问题,解决方案是将 include kwarg 添加到您的 celery 调用中。

include 参数是工作程序启动时要导入的模块列表。您需要在此处添加我们的任务模块,以便工作人员能够找到我们的任务。

app = Celery('myproject', 
             backend = settings.CELERY.get('backend'),
             broker = settings.CELERY.get('broker'),
             include = ['ingest.tasks.web', ... ])

查看http://docs.celeryproject.org/en/latest/getting-started/next-steps.html#proj-celery-py了解更多信息

【讨论】:

【参考方案2】:

只是在这里发布(我不知道为什么会这样)

from django.conf import settings

app.config_from_object(settings, namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, force=True)

force=True 似乎是解决方案

另一个可行的方法是在实例化 celery 之前调用 django.setup()

from __future__ import absolute_import, unicode_literals
import os

import django

from celery import Celery


django.setup()  # This is key


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('notifs')
app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

这种方法避免了force=True,导入django.conf.settings,对我来说似乎更干净。虽然我仍然不知道你为什么需要打电话给django.setup,因为文档中没有说明。

【讨论】:

django.setup() 为我工作; force=True 没有。谢谢你的伎俩:)【参考方案3】:
from django.conf import settings    
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

您可以添加上面的行使 celery 自动发现整个项目中编写的所有任务。这对我有用。

【讨论】:

正如我的帖子和示例代码中所述,我尝试了这个,但没有成功。【参考方案4】:

我发现更大的问题是 Celery 没有将我的自定义 Celery() 实例设置为当前应用程序。为了解决这个问题,我不得不修改我的celery_init.py 以包括:

from celery._state import _set_current_app

# setup my app = Celery(...)

_set_current_app(app)

【讨论】:

【参考方案5】:

即使我也不确定这是否可行。但我想它对我有用:

在您提到的同一documentation 中有一个部分:

enter image description here

在此之后,我删除了 CELERY_IMPORTS,它可以注册我项目中所有应用程序的任务

【讨论】:

【参考方案6】:

请注意,如果您有基于 的任务 (CBT),例如在我们的项目中,那么上面的这些技巧仍然不起作用。例如:

from celery.app.task import Task

class CustomTask(Task):

    def run(self):
        print('running.')

我发现有两种解决方案/解决方法:

    Register the task 在任务类定义下面,并将任务实例分配为新的全局变量:

    from celery import current_app
    
    CustomTask = current_app.register_task(CustomTask())
    

    如果你有很多 CBT,这有点破坏性,并且现有的 CustomTask().apply_async() 调用应该转换为 CustomTask.apply_async(),因为 CustomTask 现在是一个对象/实例(以前是一个类名)。虽然装饰器可能会简化注册部分。

    使用旧的/向后兼容的基任务类作为父类,而不是celery.app.task.Task

    from celery.task import Task
    
    class CustomTask(Task):
        ...
    

    这在Celery 5.x 代码库中似乎已被弃用。但至少在我们升级之前它是目前最简单的解决方案(使用 Celery 4.x)。

【讨论】:

以上是关于Celery 4 不自动发现任务的主要内容,如果未能解决你的问题,请参考以下文章

重试丢失或失败的任务(Celery、Django 和 RabbitMQ)

Django 项目celery beat报错:Pidfile already exists

异步 celery 任务完成后自动调用 PHP 代码(celery-php)

Celery .delay() 同步工作,不延迟

芹菜自动重载不起作用

用supervisord运行芹菜无法发现任务