Flask + Celery + Redis:消费者:无法连接到 amqp://guest:**@127.0.0.1:5672//:超时

Posted

技术标签:

【中文标题】Flask + Celery + Redis:消费者:无法连接到 amqp://guest:**@127.0.0.1:5672//:超时【英文标题】:Flask + Celery + Redis: consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: timed out 【发布时间】:2019-01-10 11:32:33 【问题描述】:

我设置了一个简单的 celery 任务。要运行它,我首先启动了 redis-server,然后激活了虚拟环境并输入“celery beat”,打开一个新的终端窗口引导到虚拟环境并输入“celery worker”

Flask==1.0.2
celery==4.2.1
requests==2.19

这是之后的错误信息:

消费者:无法连接到 amqp://guest:**@127.0.0.1:5672//:定时 出去。

这是执行“celery beat”后显示的配置详细信息:

配置-> .经纪人 -> amqp://guest:**@localhost:5672// .加载器-> celery.loaders.default.Loader .调度程序-> celery.beat.PersistentScheduler . db -> celerybeat-schedule .日志文件 -> [stderr]@%WARNING .最大间隔 -> 5.00 分钟(300 秒)

flask-proj/app/__init__.py

from flask import Flask, request, jsonify
from celery import Celery
import celeryconfig

app = Flask(__name__)
app.config.from_object('config')

def make_celery(app):
    # create context tasks in celery
    celery = Celery(
        app.import_name,
        broker=app.config['BROKER_URL']
    )
    celery.conf.update(app.config)
    celery.config_from_object(celeryconfig)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask

    return celery

celery = make_celery(app)

@app.route("/")
def hello():
    return "Hello World!"

flask-proj/tasks/test.py

import celery

@celery.task()
def print_hello():
    logger = print_hello.get_logger()
    logger.info("Hello")

flask-proj/config.py

import os

REDIS_HOST = "127.0.0.1" REDIS_PORT = 6379 BROKER_URL = environ.get('REDIS_URL', "redis://host:port/0".format(
    host=REDIS_HOST, port=str(REDIS_PORT))) CELERY_RESULT_BACKEND = BROKER_URL

flask-proj/celeryconfig.py

from celery.schedules import crontab

CELERY_IMPORTS = ('app.tasks.test')
CELERY_TASK_RESULT_EXPIRES = 30
CELERY_TIMEZONE = 'UTC'

CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

CELERYBEAT_SCHEDULE = 
    'test-celery': 
        'task': 'app.tasks.test.print_hello',
        # Every minute
        'schedule': crontab(minute="*"),
    

如果我需要提供其他详细信息,请告诉我。

【问题讨论】:

您可以尝试将_init_.py中的celery.conf.update(app.config) celery.config_from_object(celeryconfig) 行的顺序更改为celery.config_from_object(celeryconfig) celery.conf.update(app.config ) 【参考方案1】:

在 Django 中有同样的问题,但我的问题是在 settings.py 中使用“BROKER_URL”而不是“CELERY_BROKER_URL”。 Celery 没有找到 URL,默认使用 rabbitmq 端口而不是 redis 端口。

【讨论】:

【参考方案2】:

make_celery() 函数中删除 celery.conf.update(app.config) 行,因此它会像,

def make_celery(app):
    # create context tasks in celery
    celery = Celery(
        app.import_name,
        broker=app.config['BROKER_URL']
    )
    celery.conf.update(app.config) # remove this line.
    celery.config_from_object(celeryconfig)
    TaskBase = celery.Task

并且, 将flask-proj/config.py 的粘贴内容复制到flask-proj/celeryconfig.py。 因此他 flask-proj/celeryconfig.py 会喜欢,

from celery.schedules import crontab

import os

REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
BROKER_URL = os.environ.get(
    'REDIS_URL', "redis://host:port/0".format(
        host=REDIS_HOST, port=str(REDIS_PORT)))
CELERY_RESULT_BACKEND = BROKER_URL

CELERY_IMPORTS = ('app.tasks.test')
CELERY_TASK_RESULT_EXPIRES = 30
CELERY_TIMEZONE = 'UTC'

CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

CELERYBEAT_SCHEDULE = 
    'test-celery': 
        'task': 'app.tasks.test.print_hello',
        # Every minute
        'schedule': crontab(minute="*"),
    

【讨论】:

谢谢,是的,我不得不删除该行。不仅如此,我没有指定应用程序名称,它以某种方式默认为rabbitmq。当我运行这个:'celery beat -A app.celery'时,它可以工作。【参考方案3】:

amqp 是rabbitmq 而不是redis。

Redis 通常是

redis://:password@hostname:port/db_number

我会手动配置以查看它是否有效。

flask_app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)

【讨论】:

感谢您的提示!我认为因为我没有指定应用程序名称,所以它默认为rabbitmq。当我运行这个:'celery beat -A app.celery'时,它可以工作。

以上是关于Flask + Celery + Redis:消费者:无法连接到 amqp://guest:**@127.0.0.1:5672//:超时的主要内容,如果未能解决你的问题,请参考以下文章

将使用 Celery 和 Redis 的 Flask 应用程序部署到 AWS:直接使用 Elastic Beanstalk 还是 EC2?

[Django] celery的替代品 funboost

Flask+Celery+Redis实现队列化异步任务

celery的使用

celery的使用

Flask- celery (芹菜)