如何将自定义模型添加到 django celery

Posted

技术标签:

【中文标题】如何将自定义模型添加到 django celery【英文标题】:How to add custom model to django_celery 【发布时间】:2017-06-08 04:55:03 【问题描述】:

我正在努力使 celery 适合高可用性我已经分叉了 django_celery 项目和 celery 的这个分支,以便进行我需要的自定义。 celery 链接显示了使用以下代码对 beat.py 的修改:

我已将此 Lock 模型添加到 django_celery models.py 文件中,并且能够正常迁移:

from django.db import models

@python_2_unicode_compatible
class Lock(models.Model):
    name = models.CharField(max_length=127, unique=True)
    created = models.DateTimeField(auto_now=True)

    class Meta:
        verbose_name_plural = _('locks')

    def __str__(self):
        return self.name

在 utils 文件夹的 celery 中,我添加了这个 locked.py 文件:

from djcelery.models import Lock
from datetime import datetime, timedelta
from django.db import transaction, IntegrityError


class Locked(object):
    """A context manager to add a distributed mutex."""

    def __init__(self, name, timeout):
        self.name = name
        self.lock = None
        self.timeout = timeout

    def __enter__(self):
        # first delete any expired locks
        expired = datetime.utcnow() - timedelta(seconds=self.timeout)
        Lock.objects.filter(name=self.name, created__lte=expired).delete()
        # then try to get the lock
        try:
            Lock(name=self.name).save()
        except IntegrityError:
            transaction.rollback()
            raise LockError('Could not acquire lock: 0'.format(self.name))

    def __exit__(self, *args):
        Lock.objects.filter(name=self.name).delete()


class LockError(Exception):
    """Exception thrown when the requested lock already exists."""

    pass

通过这些更改,我可以运行以下命令:

celery worker
python manage.py runserver
python manage.py shell

当我尝试运行调度程序时出现问题:

celery beat

我收到以下错误:

Traceback (most recent call last):
  File "venv/bin/celery", line 11, in <module>
    load_entry_point('celery', 'console_scripts', 'celery')()
  File "/venv/src/celery/celery/__main__.py", line 30, in main
    main()
  File "/venv/src/celery/celery/bin/celery.py", line 81, in main
    cmd.execute_from_commandline(argv)
  File "/venv/src/celery/celery/bin/celery.py", line 793, in execute_from_commandline
    super(CeleryCommand, self).execute_from_commandline(argv)))
  File "/venv/src/celery/celery/bin/base.py", line 311, in execute_from_commandline
    return self.handle_argv(self.prog_name, argv[1:])
  File "/venv/src/celery/celery/bin/celery.py", line 785, in handle_argv
    return self.execute(command, argv)
  File "/venv/src/celery/celery/bin/celery.py", line 717, in execute
    ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
  File "/venv/src/celery/celery/bin/base.py", line 315, in run_from_argv
    sys.argv if argv is None else argv, command)
  File "/venv/src/celery/celery/bin/base.py", line 377, in handle_argv
    return self(*args, **options)
  File "/venv/src/celery/celery/bin/base.py", line 274, in __call__
    ret = self.run(*args, **kwargs)
  File "/venv/src/celery/celery/bin/beat.py", line 72, in run
    beat = partial(self.app.Beat,
  File "/venv/lib/python2.7/site-packages/kombu/utils/__init__.py", line 325, in __get__
    value = obj.__dict__[self.__name__] = self.__get(obj)
  File "/venv/src/celery/celery/app/base.py", line 572, in Beat
    return self.subclass_with_self('celery.apps.beat:Beat')
  File "/venv/src/celery/celery/app/base.py", line 504, in subclass_with_self
    Class = symbol_by_name(Class)
  File "/venv/lib/python2.7/site-packages/kombu/utils/__init__.py", line 96, in symbol_by_name
    module = imp(module_name, package=package, **kwargs)
  File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/importlib/__init__.py", line 37, in import_module
    __import__(name)
  File "/venv/src/celery/celery/apps/beat.py", line 19, in <module>
    from celery import VERSION_BANNER, platforms, beat
  File "/venv/src/celery/celery/beat.py", line 35, in <module>
    from .utils.locked import Locked, LockError
  File "/venv/src/celery/celery/utils/locked.py", line 1, in <module>
    from djcelery.models import Lock
  File "/venv/src/django-celery/djcelery/models.py", line 30, in <module>
    class TaskMeta(models.Model):
  File "/venv/lib/python2.7/site-packages/django/db/models/base.py", line 105, in __new__
    app_config = apps.get_containing_app_config(module)
  File "/venv/lib/python2.7/site-packages/django/apps/registry.py", line 237, in get_containing_app_config
    self.check_apps_ready()
  File "/venv/lib/python2.7/site-packages/django/apps/registry.py", line 124, in check_apps_ready
    raise AppRegistryNotReady("Apps aren't loaded yet.")
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.

我的INSTALLED_APPS 设置中有 djcelery,所以我不知道此时发生了什么?

【问题讨论】:

【参考方案1】:

您必须指定用于 celery 命令的应用实例

-A APP,--app=要使用的APP应用实例(例如module.attr_name)

例如,如果我有结构

pybilling
- pybilling
  - celeryconfig.py

那我应该用命令开始 celery beat

celery --app pybilling.celeryconfig:app beat

这里是 celeryconfig.py 的内容

from __future__ import absolute_import

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'pybilling.settings')

from django.conf import settings  # noqa

app = Celery('pybilling')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

【讨论】:

【参考方案2】:

Celery 经常与 Django 一起使用,并且与 Django 兼容,但它固有地不是 Django 应用程序。您所做的修改是,当您运行celery beat 时,会加载 Django 模型。为了能够使用模型,必须首先初始化应用程序。执行此操作的标准方法是调用django.setup() 设置后,以便 Django 的代码可以找到 Django 设置。可能是这样的:

import os
import django

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "project.settings")
django.setup()

您需要将project.settings 更改为包含设置的实际模块名称。

【讨论】:

以上是关于如何将自定义模型添加到 django celery的主要内容,如果未能解决你的问题,请参考以下文章

Django Allauth - 如何将自定义 CSS 类添加到字段?

如何将自定义标签/过滤器添加到现有的 Django 应用程序?

如何正确地为 Django 的用户模型添加权限?

如何将自定义视图+控制器添加到模型?

将自定义 NER 模型添加到 spaCy 管道

将自定义占位符添加到 django-cms