解决 celery 和 django 中的循环导入问题

Posted

技术标签:

【中文标题】解决 celery 和 django 中的循环导入问题【英文标题】:Resolving circular imports in celery and django 【发布时间】:2014-12-10 07:44:00 【问题描述】:

我有一个 Django 应用程序,它使用 Celery 卸载一些任务。主要是延迟了数据库表中某些字段的计算。

所以,我有一个tasks.py:

from models import MyModel
from celery import shared_task

@shared_task
def my_task(id):
    qs = MyModel.objects.filter(some_field=id)
    for record in qs:
        my_value = #do some computations
        record.my_field = my_value
        record.save()

在models.py中

 from django.db import models
 from tasks import my_task

 class MyModel(models.Model):
      field1 = models.IntegerField()
      #more fields
      my_field = models.FloatField(null=True)

      @staticmethod
      def load_from_file(file):
          #parse file, set fields from file
          my_task.delay(id)

现在显然,由于循环导入(models 导入 taskstasks 导入 models),这将不起作用。

我暂时通过从views.py 调用my_task.delay() 解决了这个问题,但是将模型逻辑保留在模型类中似乎是有意义的。有更好的方法吗?

【问题讨论】:

创建一个自定义的 ModelManager 并放入一个单独的文件中。 【参考方案1】:

joshua 贴的解决方案很好,但是我第一次尝试的时候发现我的@receiver装饰器没有效果。那是因为 tasks 模块没有在任何地方导入,这是预期的,因为我使用了 task auto-discovery。

但是,还有另一种方法可以将 tasks.pymodules.py 分离。也就是说,任务可以按名称发送,而不必在发送它们的过程中评估(导入):

from django.db import models
#from tasks import my_task
import celery

class MyModel(models.Model):
    field1 = models.IntegerField()
    #more fields
    my_field = models.FloatField(null=True)

    @staticmethod
    def load_from_file(file):
        #parse file, set fields from file
        #my_task.delay(id)
        celery.current_app.send_task('myapp.tasks.my_task', (id,))

send_task() 是 Celery 应用对象上的一个方法。

在此解决方案中,take care of correct, predictable names 对您的任务很重要。

【讨论】:

【参考方案2】:

在您的模型中,您可以在使用它之前导入它,而不是在文件开头导入my_task。它将解决循环导入问题。

from django.db import models

class MyModel(models.Model):
      field1 = models.IntegerField()
      #more fields
      my_field = models.FloatField(null=True)

      @staticmethod
      def load_from_file(file):
          #parse file, set fields from file
          from tasks import my_task   # import here instead of top
          my_task.delay(id)

或者,您也可以在 tasks.py 中执行相同的操作。您可以在使用之前导入模型,而不是开始。

替代方案:

您可以使用send_task 方法来调用您的任务

from celery import current_app
from django.db import models

class MyModel(models.Model):
      field1 = models.IntegerField()
      #more fields
      my_field = models.FloatField(null=True)

      @staticmethod
      def load_from_file(file):
          #parse file, set fields from file
          current_app.send_task('myapp.tasks.my_task', (id,))

【讨论】:

我不同意这是代码异味,我认为这是必需品。 我认为在这种情况下调用send_task 比使用信号要好得多【参考方案3】:

只是为了在这个列表中再添加一个不太好的解决方案,我最终做的是依赖django's now-built-in app registry。

所以在tasks.py 中,您可以使用apps.get_model() 来访问模型,而不是从模型中导入。

我用一个带有健康文档的辅助方法来做这件事,只是为了表达为什么这是痛苦的:

from django.apps import apps

def _model(model_name):
    """Generically retrieve a model object.

    This is a hack around Django/Celery's inherent circular import
    issues with tasks.py/models.py. In order to keep clean abstractions, we use
    this to avoid importing from models, introducing a circular import.

    No solutions for this are good so far (unnecessary signals, inline imports,
    serializing the whole object, tasks forced to be in model, this), so we
    use this because at least the annoyance is constrained to tasks.
    """
    return apps.get_model('my_app', model_name)

然后:

@shared_task
def some_task(post_id):
    post = _model('Post').objects.get(pk=post_id)

你当然可以直接使用apps.get_model()

【讨论】:

我喜欢这个解决方案。如果我没记错的话,django 的 AppConfig 功能是专门为这种情况而添加的,在这种情况下你还不能(由于某种原因)加载一些 django 模型。 我认为这是最好的方法!非常感谢!【参考方案4】:

使用信号。

tasks.py

from models import MyModel, my_signal
from celery import shared_task
from django.dispatch import receiver

@shared_task
def my_task(id):
    qs = MyModel.objects.filter(some_field=id)
    for record in qs:
        my_value = #do some computations
        record.my_field = my_value
        record.save()

@receiver(my_signal)
def my_receiver(sender, **kwargs):
    my_task.delay(kwargs['id'])

models.py

 from django.db import models
 from tasks import my_task
 from django.dispatch import Signal

 my_signal = Signal(providing_args=['id'])

 class MyModel(models.Model):
      field1 = models.IntegerField()
      #more fields
      my_field = models.FloatField(null=True)

      @staticmethod
      def load_from_file(file):
          #parse file, set fields from file
          my_signal.send(sender=?, id=?)

【讨论】:

使用信号恕我直言是解决循环导入问题的糟糕解决方案。最后,您的代码很难理解。当您确实需要广播某些内容时会发出信号。 我同意这些信号会使代码混淆。 Piotr Ćwiek 使用 send_task() 的答案应该被接受 IMO 的答案。

以上是关于解决 celery 和 django 中的循环导入问题的主要内容,如果未能解决你的问题,请参考以下文章

Django:执行 cron 命令时,Celery 导入导致错误

Django Channels 从 Celery 任务发送组消息。 Asyncio 事件循环在所有异步任务完成之前停止

由于 ImportError,Celery Django 部署因 Elastic Beanstalk 失败:无法导入名称“Celery”(ElasticBeanstalk::ExternalInvoc

Django-import-export-celery 导入错误 [Errno 13] 权限被拒绝

RabbitMQ 上的 Heroku、Django 和 celery

在 celery 3.1 中,制作 django 周期性任务