芹菜不处理来自 RabbitMQ 的任务

Posted

技术标签:

【中文标题】芹菜不处理来自 RabbitMQ 的任务【英文标题】:Celery not processing tasks from RabbitMQ 【发布时间】:2018-04-23 02:09:27 【问题描述】:

我有一个 Celery 4.1 worker 配置为处理来自名为“longjobs”的队列中的任务,使用 RabbitMQ 作为我的消息传递后端。

我的 Celery 配置和工作人员通过 Django 1.11 项目进行管理。

没有任何错误,但从我的 Django 应用程序启动的任务永远不会被我的工作人员拾取。

我的celery.py 文件如下所示:

from __future__ import absolute_import
import os
import sys

from celery import Celery
from celery._state import _set_current_app
import django

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
_set_current_app(app)

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings.settings')
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../myproject')))
django.setup()
from django.conf import settings
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

我的 Django Celery 设置是:

CELERY_IGNORE_RESULT = False
CELERY_TRACK_STARTED = True
CELERY_IMPORTS = (
    'myproject.myapp.tasks',
)
CELERY_RESULT_BACKEND = 'amqp'
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_RESULT_PERSISTENT = True
CELERY_ALWAYS_EAGER = False
CELERY_ROUTES = 
    'mytask': 'queue': 'longjobs',

CELERY_WORKER_PREFETCH_MULTIPLIER = CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_SEND_TASK_ERROR_EMAILS = True
CELERY_ACKS_LATE = True
CELERY_TASK_RESULT_EXPIRES = 360000

我启动我的工人:

celery worker -A myproject -l info -n longjobs@%h -Q longjobs

在它的日志文件中,我看到:

[2017-11-09 16:51:03,218: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672/myproject
[2017-11-09 16:51:03,655: INFO/MainProcess] mingle: searching for neighbors
[2017-11-09 16:51:05,441: INFO/MainProcess] mingle: all alone
[2017-11-09 16:51:06,162: INFO/MainProcess] longjobs@localhost ready.

表明工作人员使用正确的虚拟主机和队列名称成功连接到 RabbitMQ。

我正在使用 Flower 和 RabbitMQ 管理界面进行调试。 Flower 确认我的 worker 正在运行,但表示它从未收到任何任务。

RabbitMQ 管理员有点陌生。它说我的“myproject”虚拟主机存在“longjob”队列,它也没有收到任何任务,但是有大量的队列名称为 UUID,它们具有不同数量的“就绪”任务待办的。其中之一有 200 多个任务。

为什么我的 Celery 工作者不能正确地从 RabbitMQ 拉取任务?我没有在任何日志文件中看到任何错误。我该如何诊断?

【问题讨论】:

你的任务在哪里?你怎么称呼一个? os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings') 【参考方案1】:

尝试将 CELERY_ROUTES 更改为 CELERY_TASK_ROUTES(在 4.x 版本中)。

或者,我宁愿更换你的路由器:

CELERY_ROUTES = 
    'mytask': 'queue': 'longjobs',

到:

CELERY_ROUTES = 
    'mytask': 
        'exchange': 'longjobs',
        'exchange_type': 'longjobs',
        'routing_key': 'longjobs'
    

【讨论】:

【参考方案2】:

抱歉,我还不能发表评论,但你的节拍过程在哪里?您可以使用--beat 选项运行worker,不推荐用于生产,也可以单独运行beat 进程。

celery beat -A myproject -l info [--detach]

【讨论】:

以上是关于芹菜不处理来自 RabbitMQ 的任务的主要内容,如果未能解决你的问题,请参考以下文章

Flask- celery (芹菜)

检索芹菜队列中的任务列表

RabbitMQ 笔记-工作队列

为什么芹菜会给rabbitmq添加数千个队列,这些队列在任务完成后似乎会持续很长时间?

来自异步芹菜工作者的 SocketIO 发出不工作

Django 学习之Celery(芹菜)