芹菜 + Django 信号

Posted

技术标签:

【中文标题】芹菜 + Django 信号【英文标题】:Celery + Django Signals 【发布时间】:2015-09-25 06:19:36 【问题描述】:

我正在尝试将 Django 信号的 post_save 功能与 Celery 任务结合使用。将新的 Message 对象保存到数据库后,我想评估该实例是否具有两个属性之一,如果有,则调用 'send_sms_function' 这是 Celery 注册的任务。

tasks.py

from my_project.celery import app

@app.task
def send_sms_message(message):
    # Do something

signals.py

from django.db.models.signals import post_save
from django.dispatch import receiver

import rollbar
rollbar.init('234...0932', 'production')

from dispatch.models import Message
from comm.tasks import send_sms_message


@receiver(post_save, sender=Message)
def send_outgoing_messages(sender, instance, **kwargs):

if instance.some_attribute == 'A' or instance.some_attribute == 'B':
    try:
        send_sms_message.delay(instance)
    except:
         rollbar.report_exc_info()
else:
    pass

我正在通过运行 Celery 工作人员在本地对此进行测试。当我在 Django shell 中并调用 Celery 函数时,它按预期工作。但是,当我将 Message 实例保存到数据库时,该函数按预期工作:没有任何内容发布到任务队列,我没有看到任何错误消息。

我做错了什么?

【问题讨论】:

任务的参数中应该有可序列化的对象(int,str等),不允许有类或模型实例之类的东西;那是因为 celery 在真正运行任务之前,必须把它写成 redis 什么的,而 redis 是一个 REmote DIctionary Service,就像一个 JSON 存储;我建议在任务本身中传递 instance_id 并执行 from django.apps import get_model; Model = get_model('app.model'); instance = Model.objects.get(pk=instance_id,总是对我有用 【参考方案1】:

表达式if instance.some_attribute == 'A' or 'B' 可能是你的问题。

你的意思可能是:

if instance.some_attribute == 'A' or instance.some_attribute == 'B'

或者,我会怎么写:

if instance.some_attribute in ('A', 'B')

【讨论】:

【参考方案2】:

您正在同步调用该函数,而不是对其进行排队:

send_sms_message.delay(instance)

应该将消息排队

http://celery.readthedocs.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.delay

http://celery.readthedocs.org/en/latest/userguide/calling.html#basics

@dgel 还指出了一个逻辑错误

【讨论】:

谢谢@dm03514 我在这里更新了我的代码并在本地运行它,但是'send_sms_message'函数仍然没有进入任务队列并且没有触发。关于我做错了什么还有其他想法吗? 我以为我在某处读到您不应该从实例 attr 构建查询,但不确定为什么会出现这种情况或是否属实。只是一个想法。【参考方案3】:

这看起来像是序列化和/或您的设置有问题。当 celery 将消息传递给您的代理时,它需要对数据进行一些表示。 Celery 序列化您给任务的参数,但如果您没有将其配置与您传递的内容一致(即您的代理期望 JSON 的位置不匹配,但您向它发送了一个腌制的 python 对象),任务可能会简单地失败因为工作人员无法轻松解码您发送的内容。如果您在 shell 中运行该函数(没有调用延迟),它将被同步调用,因此没有序列化或消息传递。

在您的设置中,您应该使用 JSON 序列化(除非您有充分的理由),但如果没有,那么您的酸洗可能有问题。当您运行 celery 时,您始终可以提高日志级别以进行调试,以查看有关序列化相关错误的更多信息:

celery -A yourapp worker -l debug

如有疑问,请使用该打印语句/函数来确保您的信号接收器正在运行。如果没有,您可以创建一个 AppConfig 类,在它的 ready 方法中导入您的接收器,或者使用其他一些合理的技术来确保您的接收器正在注册。

[意见] 我建议做这样的事情:

@receiver(post_save, sender=Message)
def send_outgoing_messages(sender, instance, **kwargs):
    enqueue_message.delay(instance.id)

yourmodule/tasks.py

@app.task
def enqueue_message(message_id):
    msg = Message.object.get(id=message_id)
    if msg.some_attribute in ('A', 'B'):  # slick or
        send_sms_message.delay(message_id)

您始终可以使用 Celery 的组合技术,但这里有一些不会增加请求/响应周期的复杂性的东西。 [/意见]

【讨论】:

如果您将 id 传递给实例,请确保在调用异步任务之前通过执行 transaction.on_commit(lambda: enqueue_message.delay(instance.id)) 或类似的方式保存对实例的更改,以确保消息仅在事务完成后发送。

以上是关于芹菜 + Django 信号的主要内容,如果未能解决你的问题,请参考以下文章

Django 1.6 + RabbitMQ 3.2.3 + Celery 3.1.9 - 为啥我的芹菜工人死于:WorkerLostError:工人过早退出:信号11(SIGSEGV)

芹菜任务不会在 django 中执行

芹菜登录 Django

芹菜任务未显示在 Django Admin 中

如何使用芹菜工人将 django 项目部署到谷歌云?

如何记录 Django 芹菜任务中发生的异常