芹菜 + Django 信号
Posted
技术标签:
【中文标题】芹菜 + Django 信号【英文标题】:Celery + Django Signals 【发布时间】:2015-09-25 06:19:36 【问题描述】:我正在尝试将 Django 信号的 post_save 功能与 Celery 任务结合使用。将新的 Message 对象保存到数据库后,我想评估该实例是否具有两个属性之一,如果有,则调用 'send_sms_function' 这是 Celery 注册的任务。
tasks.py
from my_project.celery import app
@app.task
def send_sms_message(message):
# Do something
signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
import rollbar
rollbar.init('234...0932', 'production')
from dispatch.models import Message
from comm.tasks import send_sms_message
@receiver(post_save, sender=Message)
def send_outgoing_messages(sender, instance, **kwargs):
if instance.some_attribute == 'A' or instance.some_attribute == 'B':
try:
send_sms_message.delay(instance)
except:
rollbar.report_exc_info()
else:
pass
我正在通过运行 Celery 工作人员在本地对此进行测试。当我在 Django shell 中并调用 Celery 函数时,它按预期工作。但是,当我将 Message 实例保存到数据库时,该函数不按预期工作:没有任何内容发布到任务队列,我没有看到任何错误消息。
我做错了什么?
【问题讨论】:
任务的参数中应该有可序列化的对象(int,str等),不允许有类或模型实例之类的东西;那是因为 celery 在真正运行任务之前,必须把它写成 redis 什么的,而 redis 是一个 REmote DIctionary Service,就像一个 JSON 存储;我建议在任务本身中传递 instance_id 并执行from django.apps import get_model; Model = get_model('app.model'); instance = Model.objects.get(pk=instance_id
,总是对我有用
【参考方案1】:
表达式if instance.some_attribute == 'A' or 'B'
可能是你的问题。
你的意思可能是:
if instance.some_attribute == 'A' or instance.some_attribute == 'B'
或者,我会怎么写:
if instance.some_attribute in ('A', 'B')
【讨论】:
【参考方案2】:您正在同步调用该函数,而不是对其进行排队:
send_sms_message.delay(instance)
应该将消息排队
http://celery.readthedocs.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.delay
http://celery.readthedocs.org/en/latest/userguide/calling.html#basics
@dgel 还指出了一个逻辑错误
【讨论】:
谢谢@dm03514 我在这里更新了我的代码并在本地运行它,但是'send_sms_message'函数仍然没有进入任务队列并且没有触发。关于我做错了什么还有其他想法吗? 我以为我在某处读到您不应该从实例 attr 构建查询,但不确定为什么会出现这种情况或是否属实。只是一个想法。【参考方案3】:这看起来像是序列化和/或您的设置有问题。当 celery 将消息传递给您的代理时,它需要对数据进行一些表示。 Celery 序列化您给任务的参数,但如果您没有将其配置与您传递的内容一致(即您的代理期望 JSON 的位置不匹配,但您向它发送了一个腌制的 python 对象),任务可能会简单地失败因为工作人员无法轻松解码您发送的内容。如果您在 shell 中运行该函数(没有调用延迟),它将被同步调用,因此没有序列化或消息传递。
在您的设置中,您应该使用 JSON 序列化(除非您有充分的理由),但如果没有,那么您的酸洗可能有问题。当您运行 celery 时,您始终可以提高日志级别以进行调试,以查看有关序列化相关错误的更多信息:
celery -A yourapp worker -l debug
如有疑问,请使用该打印语句/函数来确保您的信号接收器正在运行。如果没有,您可以创建一个 AppConfig
类,在它的 ready
方法中导入您的接收器,或者使用其他一些合理的技术来确保您的接收器正在注册。
[意见] 我建议做这样的事情:
@receiver(post_save, sender=Message)
def send_outgoing_messages(sender, instance, **kwargs):
enqueue_message.delay(instance.id)
在yourmodule/tasks.py
@app.task
def enqueue_message(message_id):
msg = Message.object.get(id=message_id)
if msg.some_attribute in ('A', 'B'): # slick or
send_sms_message.delay(message_id)
您始终可以使用 Celery 的组合技术,但这里有一些不会增加请求/响应周期的复杂性的东西。 [/意见]
【讨论】:
如果您将 id 传递给实例,请确保在调用异步任务之前通过执行transaction.on_commit(lambda: enqueue_message.delay(instance.id))
或类似的方式保存对实例的更改,以确保消息仅在事务完成后发送。以上是关于芹菜 + Django 信号的主要内容,如果未能解决你的问题,请参考以下文章
Django 1.6 + RabbitMQ 3.2.3 + Celery 3.1.9 - 为啥我的芹菜工人死于:WorkerLostError:工人过早退出:信号11(SIGSEGV)