django redis celery 和 celery beats 的正确设置

Posted

技术标签:

【中文标题】django redis celery 和 celery beats 的正确设置【英文标题】:Correct setup of django redis celery and celery beats 【发布时间】:2018-07-13 21:50:45 【问题描述】:

我一直在尝试设置 django + celery + redis + celery_beats,但这给我带来了麻烦。文档非常简单,但是当我运行 django 服务器、redis、celery 和 celery beats 时,没有任何东西被打印或记录(我所有的测试任务都会记录一些东西)。

这是我的文件夹结构:

- aenima 
 - aenima
   - __init__.py
   - celery.py

 - criptoball
   - tasks.py

celery.py 看起来像这样:

from __future__ import absolute_import, unicode_literals
import os
from django.conf import settings
from celery import Celery


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'aenima.settings')

app = Celery("criptoball")
app.conf.broker_url = 'redis://localhost:6379/0'

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.conf.timezone = 'UTC'

@app.task(bind=True)
def debug_task(self):
    print('Request: 0!r'.format(self.request))

app.conf.beat_schedule = 
    'test-every-30-seconds': 
        'task': 'tasks.test_celery',
        'schedule': 30.0,
        'args': (16, 16)
    , 

task.py 看起来像这样:

from __future__ import absolute_import, unicode_literals
from datetime import datetime, timedelta
from celery import shared_task
import logging

from django_celery_beat.models import PeriodicTask, IntervalSchedule

cada_10_seg = IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)

test_celery_periodic = PeriodicTask.objects.create(interval=cada_10_seg, name='test_celery', task='criptoball.tasks.test_celery',
expires=datetime.utcnow()+timedelta(seconds=30))

@shared_task
def test_celery(x, y):
    logger = logging.getLogger("AENIMA")
    print("EUREKA")
    logger.debug("EUREKA")

这是django_celery_beat 文档

不知道我错过了什么。当我跑步时

celery -A aenima beat -l debug --scheduler django_celery_beat.scheduler:DatabaseScheduler

celery -A aenima worker -l 调试

redis-cli ping 乒乓

django runserver 和 redis 服务器,我什么也没打印。

settings.py

CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = TIME_ZONE
CELERY_IMPORTS = ('criptoball.tasks',)

到目前为止,还没有找到任何关于这个话题的权威答案。

我想解决这一切,这个错误可能只是众多错误之一。非常感谢您的帮助!

编辑:

为 redis 添加了设置,以不同的方式声明了任务并提高了调试级别。现在的错误是:

收到 u'tasks.test_celery' 类型的未注册任务。讯息 已被忽略并丢弃。

您还记得导入包含此任务的模块吗?或许 您正在使用相对进口? KeyError: u'aenima.criptoball.tasks.test_celery'

我认为 Celery 的文档很差。

编辑 2 在尝试了一切之后,当我将所有任务放在同一个 celery.py 文件中时,它就起作用了。 @shared_task 不起作用,必须使用 @app.task 。

【问题讨论】:

你有一个芹菜工人在跑步吗?例如。从命令行,celery worker -A <your_module_name>。东西应该打印在你开始的终端上。 @Chris 更新了答案,增加了赏金。 看起来 celery -A aenima beat celery -A aenima worker -l info 可能是转录/格式错误。你的意思是把它们放在两行吗? @sytech 是的。感谢您的评论,已更正。 我建议你使用 django-q。不是你的问题的答案,但我能感觉到你的痛苦。我用了一段时间芹菜,然后我放弃了。问题太多,当你解决了一个问题时,你还有另一个问题要调试。使用 django-q 要容易得多。祝你好运 【参考方案1】:

我以前遇到过这些问题。这不是你的代码。这通常是环境的问题。 您应该运行virtualenv 下的所有内容,添加一个带有特定包版本requirements.txt 文件。

关于celery 4.xdjango 1.x 存在一个已知问题,因此您应该考虑您正在使用的软件包。

This 教程将详细解释如何使用 celery 构建virtualenv

如果您能告诉我您的软件包版本,我可能会尝试以不同的方式提供帮助。

编辑:

我认为这与您运行芹菜的方式有关。如果我们解决了第一个问题,试试这个:

celery -A aenima.celery:app beat -l debug --scheduler django_celery_beat.schedulers:DatabaseScheduler

celery -A aenima.aenima.celery:app beat -l debug --scheduler django_celery_beat.schedulers:DatabaseScheduler

您遇到的最新错误与您的模块发现有关。 先试试吧。

【讨论】:

我正在使用 virtualenv。我有 django 1.10 和 celery 4.1 。我应该升级 django 吗? 你应该将 celery 降级到 3.x。我在我的安装中做到了这一点,并且效果很好。 我可以在文档中添加一个注释,它写成Celery 4.0 supports Django 1.8 and newer versions. Please use Celery 3.1 for versions older than Django 1.8.吗?所以 4.x 应该能够与 django 1.10 一起工作,如果有什么我建议将 django 升级到更高版本:) 你说得对,但事实是它仍然存在问题。我认为我们仍然同意问题出在版本上。如果是我,我宁愿降级 celery 升级 django。如果你的项目很大,升级 django 可能会很头疼…… 是的,当您尝试升级 Django 时,它会带来很多麻烦,但是 Django 的版本最近增长得相当快,所以最终无论哪种方式都需要升级,但是是的。我同意这也可能是版本之间的差异。 :)【参考方案2】:

使用virtualenv 会很方便。

首先就像@Gal 说你需要确保你有celery 4.x

你可以通过pip来安装它:

pip 安装芹菜

当然您也可以安装4.x 版本,将其添加到您的requirements.txt 中,如下所示:

芹菜==4.1.0

或更高版本(如果将来可用)。

然后您可以使用以下命令重新安装所有软件包:

pip install -r requirements.txt

这将确保您安装了特定的 celery 包。

现在是 Celery 部分,虽然你的代码可能没有错,但我会写下我如何让我的 Celery 应用程序工作。

__init __.py:

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery_conf import app as celery_app

__all__ = ['celery_app']

celery_conf.py:

from __future__ import absolute_import, unicode_literals

import os

from celery import Celery
from datetime import timedelta

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '<PATH.TO.YOUR.SETTINGS>')

app = Celery('tasks')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

# Set a beat schedule to update every hour.
app.conf.beat_schedule = 
    'update-every-hour': 
        'task': 'tasks.update',
        'schedule': timedelta(minutes=60),
        'args': (16, 16),
    ,


# The default task that Celery runs.
@app.task(bind=True)
def debug_task(self):
    print('Request: 0!r'.format(self.request))

tasks.py:

# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import requests

from django.conf import settings
from django.http import HttpResponse

from celery.task import Task
from celery.five import python_2_unicode_compatible
from celery import Celery
app = Celery()


@python_2_unicode_compatible
class Update(Task):
    name = 'tasks.update'

    def run(self, *args, **kwargs):
        # Run the task you want to do.

""" For me the regular TaskRegistry didn't work to register classes, 
so I found this handy TaskRegistry demo and made use of it to 
register tasks as classes."""
class TaskRegistry(Task):

    def NotRegistered_str(self):
        self.assertTrue(repr(TaskRegistry.NotRegistered('tasks.add')))

    def assertRegisterUnregisterCls(self, r, task):
        with self.assertRaises(r.NotRegistered):
            r.unregister(task)
        r.register(task)
        self.assertIn(task.name, r)

    def assertRegisterUnregisterFunc(self, r, task, task_name):
        with self.assertRaises(r.NotRegistered):
            r.unregister(task_name)
        r.register(task, task_name)
        self.assertIn(task_name, r)

    def task_registry(self):
        r = TaskRegistry()
        self.assertIsInstance(r, dict, 'TaskRegistry is mapping')

        self.assertRegisterUnregisterCls(r, Update)

        r.register(Update)
        r.unregister(Update.name)
        self.assertNotIn(Update, r)
        r.register(Update)

        tasks = dict(r)
        self.assertIsInstance(
            tasks.get(Update.name), Update)

        self.assertIsInstance(
            r[Update.name], Update)

        r.unregister(Update)
        self.assertNotIn(Update.name, r)

        self.assertTrue(Update().run())

    def compat(self):
        r = TaskRegistry()
        r.regular()
        r.periodic()

正如我在代码中所解释的那样,常规的taskregistry 在 Celery 4.x 中内置的不起作用,所以我使用了演示任务注册表。 当然你也可以不使用类来做任务,但我更喜欢使用类。

settings.py:

# Broker settings for redis
CELERY_BROKER_HOST = '<YOUR_HOST>'
CELERY_BROKER_PORT = 6379
CELERY_BROKER_URL = 'redis://'
CELERY_DEFAULT_QUEUE = 'default'

# Celery routes
CELERY_IMPORTS = (
    'PATH.TO.tasks' # The path to your tasks.py
)

CELERY_DATABASE_URL = 
    'default': '<CELERY_DATABASE>', # You can also use your already being used database here


INSTALLED_APPS = [
    ...
    'PATH.TO.TASKS' # But exclude the tasks.py from this path
]

LOGGING = 
    ...
    'loggers': 
        'celery': 
            'level': 'DEBUG',
            'handlers': ['console'],
            'propagate': True,
        ,
    

我使用以下命令启动我的工人:

redis-server --daemonize 是的

celery multi start worker -A PATH.TO.TASKS -l info --beat #但排除tasks.py路径

我希望这些信息可以帮助您或任何在 Celery 上苦苦挣扎的人。

编辑:

请注意,我将工作程序作为守护程序启动,因此您实际上无法在控制台中看到日志。 对我来说,它记录在 .txt 文件中。

另外还要注意要使用的路径,例如对于某些您需要包含应用程序的路径,如下所示:

project.apps.app

对于其他情况,您还需要包含不带.py 的tasks.py,我写下了何时排除该文件以及何时不排除。

编辑 2:

@shared_task 装饰器返回一个始终使用 current_app 中的任务实例的代理。 这使得 @shared_task 装饰器对库和可重用应用程序很有用,因为它们无法访问用户的应用程序。

请注意@shared_task 无权访问用户的应用程序。 您当前尝试注册的应用无权访问您的应用。 您实际要用于注册任务的方法是:

from celery import Celery
app = Celery()

@app.task
def test_celery(x, y):
    logger = logging.getLogger("AENIMA")
    print("EUREKA")
    logger.debug("EUREKA")

【讨论】:

感谢您的回答...更改了我的代码,您认为它必须打印类似的内容吗?我应该如何知道测试是否正在执行? @alejoss 如果你想在控制台中看到一些东西,那么你首先需要运行 redis 作为守护进程 redis-server --daemonize yes 然后 celery worker 不是作为守护进程 celery -A aenima -l info --beat,你还必须启用celery logger 在你的 django 设置中,你必须查找如何设置 celery logger。 添加了一个编辑,我想我更接近让它工作。出现了一个新的错误。如果可以的话,请看一下。 @alejoss 我给了你如何注册任务的完整设置,你的任务只是没有注册,应该很容易解决谷歌搜索? 请不要推荐 Celery 4.1——它有一些错误会阻止周期任务正常运行。【参考方案3】:

收到 u'tasks.test_celery' 类型的未注册任务。该消息已被忽略并丢弃。

您还记得导入包含此任务的模块吗?或者您可能正在使用相对导入?

可能你的任务路径不正确,应该是:

app.conf.beat_schedule = 
    'test-every-30-seconds': 
        'task': 'criptoball.tasks.test_celery',
        'schedule': 30.0,
        'args': (16, 16)
    , 

tasks.test_celery应该是完整路径:criptoball.tasks.test_celery

【讨论】:

【参考方案4】:

你应该修复一件事,使用:

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

告诉 Celery 如果您使用的是 Celery 3.x,您希望它发现哪些应用程序的任务。

【讨论】:

以上是关于django redis celery 和 celery beats 的正确设置的主要内容,如果未能解决你的问题,请参考以下文章

如何在 django celery 中处理未完成的任务

Django中使用celery来异步处理和定时任务

Celery 分布式任务队列快速入门

Django+Celery+Redis 使用

我在使用 Celery、Redis 和 Django 时遇到问题

使用 django、celery 和 redis 的一项一项任务