Celery 收到未注册类型的任务(运行示例)

Posted

技术标签:

【中文标题】Celery 收到未注册类型的任务(运行示例)【英文标题】:Celery Received unregistered task of type (run example) 【发布时间】:2012-04-03 21:33:57 【问题描述】:

我正在尝试从 Celery 文档中运行 example。

我跑:celeryd --loglevel=INFO

/usr/local/lib/python2.7/dist-packages/celery/loaders/default.py:64: NotConfigured: No 'celeryconfig' module found! Please make sure it exists and is available to Python.
  "is available to Python." % (configname, )))
[2012-03-19 04:26:34,899: WARNING/MainProcess]  

 -------------- celery@ubuntu v2.5.1
---- **** -----
--- * ***  * -- [Configuration]
-- * - **** ---   . broker:      amqp://guest@localhost:5672//
- ** ----------   . loader:      celery.loaders.default.Loader
- ** ----------   . logfile:     [stderr]@INFO
- ** ----------   . concurrency: 4
- ** ----------   . events:      OFF
- *** --- * ---   . beat:        OFF
-- ******* ----
--- ***** ----- [Queues]
 --------------   . celery:      exchange:celery (direct) binding:celery

tasks.py:

# -*- coding: utf-8 -*-
from celery.task import task

@task
def add(x, y):
    return x + y

run_task.py:

# -*- coding: utf-8 -*-
from tasks import add
result = add.delay(4, 4)
print (result)
print (result.ready())
print (result.get())

在同一个文件夹中 celeryconfig.py:

CELERY_IMPORTS = ("tasks", )
CELERY_RESULT_BACKEND = "amqp"
BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_TASK_RESULT_EXPIRES = 300

当我运行“run_task.py”时:

在 python 控制台上

eb503f77-b5fc-44e2-ac0b-91ce6ddbf153
False

celeryd 服务器上的错误

[2012-03-19 04:34:14,913: ERROR/MainProcess] Received unregistered task of type 'tasks.add'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see http://bit.ly/gLye1c for more information.

The full contents of the message body was:
'retries': 0, 'task': 'tasks.add', 'utc': False, 'args': (4, 4), 'expires': None, 'eta': None, 'kwargs': , 'id': '841bc21f-8124-436b-92f1-e3b62cafdfe7'

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/consumer.py", line 444, in receive_message
    self.strategies[name](message, body, message.ack_log_error)
KeyError: 'tasks.add'

请说明问题所在。

【问题讨论】:

您好,您能分享一下问题是什么以及您是如何解决的吗?接受的答案并没有说明其他人如何解决这个问题。谢谢。 我和乔丹在一起——这根本没用。投反对票。 aiho的答案是正确的:CELERY_IMPORTS = ("tasks", ) 【参考方案1】:

我认为您需要重新启动工作服务器。遇到同样的问题,重启解决了。

【讨论】:

这为我解决了问题。如果你使用 celeryd 脚本,worker 会在启动时导入你的任务模块。即使您随后创建更多任务函数或更改现有任务函数,worker 仍将使用其内存中的副本,就像读取它们时一样。 注意:您可以通过运行celery inspect registered来验证您的任务是否已注册 您也可以使用选项--autoreload 启动 celery,每次更改代码时都会重新启动 celery。 很遗憾已弃用。可以使用此链接中的解决方案:avilpage.com/2017/05/…【参考方案2】:

我遇到了同样的问题: "Received unregistered task of type.." 的原因是 celeryd 服务在服务启动时没有找到并注册任务(顺便说一下,当你启动时它们的列表是可见的 ./manage.py celeryd --loglevel=info)。

这些任务应该在设置文件的CELERY_IMPORTS = ("tasks", ) 中声明。 如果你有一个特殊的 celery_settings.py 文件,它必须在 celeryd 服务启动时声明为 --settings=celery_settings.py,正如 digivampire 所写。

【讨论】:

谢谢,我确实遇到了这个问题,因为我使用 ~/path/to/celery/celeryd 而不是使用 manage.py 命令来启动 celery!【参考方案3】:

您可以在celery.registry.TaskRegistry 类中查看当前已注册任务的列表。可能是您的 celeryconfig(在当前目录中)不在 PYTHONPATH 中,因此 celery 找不到它并回退到默认值。只需在启动 celery 时明确指定即可。

celeryd --loglevel=INFO --settings=celeryconfig

您也可以设置--loglevel=DEBUG,您应该会立即看到问题。

【讨论】:

+1 for --loglevel=DEBUG,我的任务中有语法错误。 celeryd 已过时。现在应该运行celery worker 例如Django 像这样celery --app=your_app.celery worker --loglevel=info 对我来说(celery 3.1.23),我必须使用celery.registry.tasks 来查看我当前所有任务的列表。您可以随时运行dir(celery.registry) 进行检查。 我这边的--loglevel=DEBUG也是【参考方案4】:

无论您使用CELERY_IMPORTS 还是autodiscover_tasks,重要的是能够找到任务,并且在 Celery 中注册的任务名称应与工作人员尝试获取的名称匹配。

当你启动 Celery 时,比如celery worker -A project --loglevel=DEBUG,你应该会看到任务的名称。例如,如果我的celery.py 中有一个debug_task 任务。

[tasks]
. project.celery.debug_task
. celery.backend_cleanup
. celery.chain
. celery.chord
. celery.chord_unlock
. celery.chunks
. celery.group
. celery.map
. celery.starmap

如果您在列表中看不到您的任务,请检查您的 celery 配置是否正确导入了任务,在 --setting--configceleryconfigconfig_from_object 中。

如果您使用的是 celery beat,请确保您在 CELERYBEAT_SCHEDULE 中使用的任务名称 task 与 celery 任务列表中的名称匹配。

【讨论】:

这很有帮助。任务名称需要与 CELERYBEAT_SCHEDULE 中的“任务”键匹配 *重要的是可以找到任务,并且在 Celery 中注册的任务的名称应该与工作人员尝试获取的名称相匹配。 * 好点!!! 这是正确答案。您在 BEAT_SCHEDULER 中的任务名称应与自动发现任务列表中显示的任何内容相匹配。因此,如果您使用@task(name='check_periodically'),那么它应该与您在节拍表中输入的内容相匹配,即:CELERY_BEAT_SCHEDULE = 'check_periodically': 'task': 'check_periodically', 'schedule': timedelta(seconds=1) 【参考方案5】:

我也有同样的问题;我加了

CELERY_IMPORTS=("mytasks")

在我的celeryconfig.py 文件中解决它。

【讨论】:

注意这应该是一个列表或者一个元组:CELERY_IMPORTS = ['my_module'] 这是为我做的 这对我有用【参考方案6】:
app = Celery('proj',
             broker='amqp://',
             backend='amqp://',
             include=['proj.tasks'])

请包含=['proj.tasks'] 你需要去顶层目录,然后执行这个

celery -A app.celery_module.celeryapp worker --loglevel=info

不是

celery -A celeryapp worker --loglevel=info

在你的 celeryconfig.py 中输入 imports = ("path.ptah.tasks",)

请在其他模块调用任务!!!!!!!!!

【讨论】:

如果您使用相对导入,则需要添加 include 参数。我已经通过添加解决了我的问题【参考方案7】:

使用 --settings 对我不起作用。我必须使用以下方法才能使其正常工作:

celery --config=celeryconfig --loglevel=INFO

这是添加了 CELERY_IMPORTS 的 celeryconfig 文件:

# Celery configuration file
BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'America/Los_Angeles'
CELERY_ENABLE_UTC = True

CELERY_IMPORTS = ("tasks",)

我的设置有点棘手,因为我使用主管将 celery 作为守护进程启动。

【讨论】:

【参考方案8】:

对我来说,这个错误是通过确保包含任务的应用程序包含在 django 的 INSTALLED_APPS 设置下来解决的。

【讨论】:

另外,需要从 /tasks.py 访问任务【参考方案9】:

对我有用的是为 celery 任务装饰器添加明确的名称。我将我的任务声明从@app.tasks 更改为@app.tasks(name='module.submodule.task')

这是一个例子

# test_task.py
@celery.task
def test_task():
    print("Celery Task  !!!!")

# test_task.py
@celery.task(name='tasks.test.test_task')
def test_task():
    print("Celery Task  !!!!")

【讨论】:

这对我也有用,但如果我在 name kwarg 中指明完整路径,则不行,但前提是我只是复制了名称,所以只需 celery.task(name='test_task')。愚蠢,但它奏效了。试图找出原因【参考方案10】:

就我而言,问题是我的项目没有正确选择autodiscover_tasks

celery.py 文件中,获取autodiscover_tasks 的代码是:

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

我改成下面这样:

from django.apps import apps
app.autodiscover_tasks(lambda: [n.name for n in apps.get_app_configs()])

祝你好运。

【讨论】:

好的,它确实有效,但为什么呢?顺便说一句谢谢:)【参考方案11】:

当我向我的 django 应用程序添加一些信号处理时,我神秘地出现了这个问题。这样做时,我将应用程序转换为使用 AppConfig,这意味着在 INSTALLED_APPS 中不是简单地读取为 'booking',而是读取为 'booking.app.BookingConfig'

Celery 不明白这是什么意思,所以我在我的 django 设置中添加了INSTALLED_APPS_WITH_APPCONFIGS = ('booking',),并修改了我的celery.py from

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

app.autodiscover_tasks(
    lambda: settings.INSTALLED_APPS + settings.INSTALLED_APPS_WITH_APPCONFIGS
)

【讨论】:

【参考方案12】:

我在运行 Celery Beat 的任务时遇到了同样的问题。 Celery 不喜欢相对导入,所以在我的 celeryconfig.py 中,我必须明确设置完整的包名:

app.conf.beat_schedule = 
   'add-every-30-seconds': 
        'task': 'full.path.to.add',
        'schedule': 30.0,
        'args': (16, 16)
    ,

【讨论】:

我希望 celery 文档有更多包含完整包名的示例。在这个答案中看到 full.path.to.add 后,我发现我不需要导入。我知道解决方案很简单,只需要一个更好的 app.conf.beat_schedule 示例。【参考方案13】:

尝试在 Python Shell 中导入 Celery 任务 - 由于导入语句错误,Celery 可能会默默地注册您的任务失败。

我的 tasks.py 文件中有一个 ImportError 异常,导致 Celery 无法在模块中注册任务。所有其他模块任务均已正确注册。

在我尝试在 Python Shell 中导入 Celery 任务之前,此错误并不明显。我修复了错误的导入语句,然后成功注册了任务。

【讨论】:

这也是我的情况。缺少导入。问题是芹菜只是默默地失败了。【参考方案14】:

奇怪的是,这也可能是因为缺少包裹。运行 pip 以安装所有必需的软件包: pip install -r requirements.txt

autodiscover_tasks 没有选择使用丢失包的任务。

【讨论】:

我遇到了类似的问题。我认为导入期间发生的异常会导致部分自动发现无法完成。 啊,是的,有道理。谢谢【参考方案15】:

我对 Django 没有任何问题。但是在我使用 Flask 时遇到了这个问题。解决方案是设置配置选项。

celery worker -A app.celery --loglevel=DEBUG --config=settings

在使用 Django 时,我有:

python manage.py celery worker -c 2 --loglevel=info

【讨论】:

【参考方案16】:

我也遇到过这个问题,但不太一样,仅供参考。由于此装饰器语法,最近的升级会导致此错误消息。

ERROR/MainProcess] Received unregistered task of type 'my_server_check'.

@task('my_server_check')

只好改成

@task()

不知道为什么。

【讨论】:

【参考方案17】:

我已经解决了我的问题,我的“任务”在一个名为“celery_task”的python包下,当我退出这个包时,运行命令celery worker -A celery_task.task --loglevel=info。有用。

【讨论】:

这确实有效,设置模块/包名称【参考方案18】:

如果您在安装的应用程序中使用应用程序配置,如下所示:

LOCAL_APPS = [
'apps.myapp.apps.MyAppConfig']

然后在您的配置应用程序中,以 ready 方法导入任务,如下所示:

from django.apps import AppConfig

class MyAppConfig(AppConfig):
    name = 'apps.myapp'

    def ready(self):
        try:
            import apps.myapp.signals  # noqa F401
            import apps.myapp.tasks
        except ImportError:
            pass

【讨论】:

【参考方案19】:

正如其他一些答案已经指出的那样,celery 会默默忽略任务的原因有很多,包括依赖问题以及任何语法或代码问题。

找到它们的一种快速方法是运行:

./manage.py check

很多时候,修复了报错后,celery 就可以识别任务了。

【讨论】:

【参考方案20】:

您是否包含了您的 tasks.py 文件或存储异步方法的任何位置?

app = Celery('APP_NAME', broker='redis://redis:6379/0', include=['app1.tasks', 'app2.tasks', ...])

【讨论】:

【参考方案21】:

如果你使用 Docker,就像上面所说的 @here 会杀死你的痛苦。

docker stop $(docker ps -a -q)

【讨论】:

如果您使用的是 docker 或 docker-compose,这就是答案。重新构建,由于某种原因,它不能正常工作。我怀疑为什么,但不是探索它们的时间。不只是重启,重建。【参考方案22】:

如果你遇到这种错误,有很多可能的原因,但我发现的解决方案是我在 /etc/defaults/celeryd 中的 celeryd 配置文件被配置为标准使用,而不是我的特定 django 项目.我将其转换为celery docs 中指定的格式后,一切正常。

【讨论】:

【参考方案23】:

我将这一行添加到 /etc/default/celeryd 的解决方案

CELERYD_OPTS="-A tasks"

因为当我运行这些命令时:

celery worker --loglevel=INFO
celery worker -A tasks --loglevel=INFO

只有后一个命令显示任务名称。

我也尝试过添加 CELERY_APP 行 /etc/default/celeryd 但这也没有用。

CELERY_APP="tasks"

【讨论】:

【参考方案24】:

我在 django-celery 中遇到了 PeriodicTask 类的问题,而在每次触发执行时启动 celery worker 时它们的名称显示得很好:

KeyError: u'my_app.tasks.run'

我的任务是一个名为“CleanUp”的类,而不仅仅是一个名为“run”的方法。

当我检查表 'djcelery_periodictask' 时,我看到了过时的条目,删除它们解决了这个问题。

【讨论】:

【参考方案25】:

只是为了我这个错误的情况加两分钱......

我的路径是/vagrant/devops/test,其中包含app.py__init__.py

当我运行 cd /vagrant/devops/ && celery worker -A test.app.celery --loglevel=info 时出现此错误。

但是当我像cd /vagrant/devops/test && celery worker -A app.celery --loglevel=info 一样运行它时,一切正常。

【讨论】:

【参考方案26】:

我发现我们的一位程序员在其中一个导入中添加了以下行:

os.chdir(<path_to_a_local_folder>)

这导致 Celery worker 将其工作目录从项目的默认工作目录(它可以在其中找到任务)更改为不同的目录(在其中找不到任务)。

去掉这行代码后,所有任务都找到并注册了。

【讨论】:

【参考方案27】:

Celery 不支持相对导入,所以在我的 celeryconfig.py 中,您需要绝对导入。

CELERYBEAT_SCHEDULE = 
        'add_num': 
            'task': 'app.tasks.add_num.add_nums',
            'schedule': timedelta(seconds=10),
            'args': (1, 2)
        

【讨论】:

【参考方案28】:

一个非常有用的列表的附加项目。

我发现 Celery 对于任务中的错误(或者至少我无法跟踪相应的日志条目)是不可原谅的,并且它没有注册它们。我在将 Celery 作为服务运行时遇到了许多问题,这些问题主要与权限相关。

与写入日志文件的权限相关的最新消息。我在命令行中开发或运行 celery 没有任何问题,但服务报告该任务未注册。

我需要更改日志文件夹权限以使服务能够写入。

【讨论】:

【参考方案29】:

我的 2 美分

我在使用 alpine 的 docker 镜像中得到了这个。 django 设置引用/dev/log 用于记录到系统日志。 django 应用程序和 celery worker 都基于相同的图像。 django 应用程序映像的入口点在启动时启动 syslogd,但芹菜工人的入口点不是。这导致像./manage.py shell 这样的事情失败,因为不会有任何/dev/log。芹菜工人没有失败。相反,它只是默默地忽略了应用程序启动的其余部分,其中包括从 django 项目中的应用程序加载 shared_task 条目

【讨论】:

【参考方案30】:

在我的情况下,错误是因为一个容器在使用 docker-compose 安装在主机文件系统上的文件夹中创建了文件。

我只需要删除主机系统上容器创建的文件,就可以再次启动我的项目。

sudo rm -Rf 文件夹名称

(我不得不使用 sudo,因为文件归 root 用户所有)

Docker 版本:18.03.1

【讨论】:

以上是关于Celery 收到未注册类型的任务(运行示例)的主要内容,如果未能解决你的问题,请参考以下文章

celery - 尽管已注册,但任务未运行。芹菜控制台不反映任务的接收

celery 教程:未注册错误

我可以单独查看和删除 Celery / RabbitMQ 任务吗?

Django 1.9 + Celery 未注册任务

Django 和 Celery 的示例:周期性任务

celery 未处理的 celery 任务