未执行的任务(Django + Heroku + Celery + RabbitMQ)

Posted

技术标签:

【中文标题】未执行的任务(Django + Heroku + Celery + RabbitMQ)【英文标题】:Tasks not executing (Django + Heroku + Celery + RabbitMQ) 【发布时间】:2015-08-08 22:57:43 【问题描述】:

我是第一次使用 RabbitMQ,我一定对一些简单的配置设置有误解。 请注意,我现在在本地运行应用程序时遇到了这个问题;我还没有尝试通过 Heroku 投入生产。

对于这个应用程序,我想每隔 20 秒在数据库中查找一些未发送的消息,并通过 Twilio 发送它们。如果我在下面的示例中遗漏了一些相关代码,请提前道歉。我已遵循所有 Celery 设置/配置说明。这是我目前的设置:

BROKER_URL = 'amqp://VflhnMEP:8wGLOrNBP.........Bhshs'  # Truncated URL string

from datetime import timedelta
CELERYBEAT_SCHEDULE = 
    'send_queued_messages_every_20_seconds': 
        'task': 'comm.tasks.send_queued_messages',
        'schedule': timedelta(seconds=20),
        # 'schedule': crontab(seconds='*/20')
        ,
    
CELERY_TIMEZONE = 'UTC'

我很确定这些任务正在 RabbitMQ 中完成;这是我可以看到的所有累积消息的破折号:

“send_queued_messages”函数应该每 20 秒调用一次。

comm/tasks.py

导入日期时间 从 celery.decorators 导入周期性任务

from comm.utils import get_user_mobile_number
from comm.api import get_twilio_connection, send_message
from dispatch.models import Message

@periodic_task
def send_queued_messages(run_every=datetime.timedelta(seconds=20)):
    unsent_messages = Message.objects.filter(sent_success=False)
    connection = get_twilio_connection()

    for message in unsent_messages:
        mobile_number = get_user_mobile_number(message=message)
        try:
            send_message(
                connection=connection,
                mobile_number=mobile_number,
                message=message.raw_text
                )
            message.sent_success=True
            message.save()
        except BaseException as e:
            raise e
            pass

我很确定我在 RabbitMQ 或我的 Heroku 项目设置中配置错误,但我不确定如何继续进行故障排除。当我运行“celery -A myproject beat”时一切似乎都很顺利。

(venv)josephs-mbp:myproject josephfusaro$ celery -A myproject beat
celery beat v3.1.18 (Cipater) is starting.
__    -    ... __   -        _
Configuration ->
    . broker -> amqp://VflhnMEP:**@happ...Bhshs
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> now (0s)
[2015-05-27 03:01:53,810: INFO/MainProcess] beat: Starting...
[2015-05-27 03:02:13,941: INFO/MainProcess] Scheduler: Sending due task send_queued_messages_every_20_seconds (comm.tasks.send_queued_messages)
[2015-05-27 03:02:34,036: INFO/MainProcess] Scheduler: Sending due task send_queued_messages_every_20_seconds (comm.tasks.send_queued_messages)

那么,为什么在没有 Celery 参与的情况下,任务不能像它们那样执行*?

我的档案:

web: gunicorn myproject.wsgi --log-file - worker: celery -A myproject beat

*我已经确认我的代码在没有涉及 Celery 的情况下按预期执行!

【问题讨论】:

发布到哪个队列任务以及您拥有的工作人员或工作人员正在从哪个队列消费?默认的? 你在运行“celery -A myproject worker”吗? beat 本身不会运行任何任务。 @tuomur 我用“worker: celery -A myproject beat”更新了我的 Procfile,并扩大了 Heroku 中的 worker dyno。 @MauroRocco 老实说,我不是 100% 确定如何回答您的问题。当我阅读 Celery 文档时,我想我在某处看到 RabbitMQ 是默认设置。正如之前的评论中提到的,我有一个 Heroku dyno 作为工人运行,我想我误解了 RabbitMQ 作为默认值,知道如何路由累积的任务。也许我需要进一步研究一些 RabbitMQ 或 Celery 文档? 嗨,Celery 在 AMPQ 协议之上工作,这意味着它在绑定到队列或更多队列的交换器上发布消息。默认情况下,celery 使用一个队列和一个名为“celery”的交换器。理解为什么你的任务没有被执行的最好方法是检查 RabbitMQ 上的这个队列,看看消息是否仍然卡在那里。从rabbitMQ,您还可以查看是否连接了消费者,这将帮助您了解您是否连接了工作人员但可能连接到错误的队列。在你的 rabbitMQ rabbitmq.com/management.html 上启用它来检查所有这些东西 【参考方案1】:

特别感谢@MauroRocco 将我推向正确的方向。我错过的部分在本教程中得到了最好的解释:https://www.rabbitmq.com/tutorials/tutorial-one-python.html

注意:我需要修改教程中的一些代码以使用 URLParameters,传入我的设置文件中定义的资源 URL。

send.py 和 receive.py 中唯一的一行是:

connection = pika.BlockingConnection(pika.URLParameters(BROKER_URL))

当然我们需要从 settings.py 中导入 BROKER_URL 变量

from settings import BROKER_URL

settings.py

BROKER_URL = 'amqp://VflhnMEP:8wGLOrNBP...4.bigwig.lshift.net:10791/sdklsfssd'

send.py

import pika
from settings import BROKER_URL

connection = pika.BlockingConnection(pika.URLParameters(BROKER_URL))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

receive.py

import pika
from settings import BROKER_URL

connection = pika.BlockingConnection(pika.URLParameters(BROKER_URL))
channel = connection.channel()

channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

channel.start_consuming()

【讨论】:

以上是关于未执行的任务(Django + Heroku + Celery + RabbitMQ)的主要内容,如果未能解决你的问题,请参考以下文章

如何在不杀死未完成的芹菜任务的情况下重新启动heroku应用程序

Heroku静态文件未加载,Django

React 和 Django 网站未在 Heroku 上加载

django 应用程序不断迁移而未生效 [heroku]

Django [Mezzanine CMS] 项目未部署到 Heroku

Heroku:Django Migration 版本未在部署中运行