在 Django 中使用 Celery 设置结果后端(rpc)

Posted

技术标签:

【中文标题】在 Django 中使用 Celery 设置结果后端(rpc)【英文标题】:Setting up a result backend (rpc) with Celery in Django 【发布时间】:2016-09-12 21:48:12 【问题描述】:

我正在尝试在本地计算机上为我正在处理的项目获取结果后端,但我遇到了问题。

目前我正在尝试创建一个队列系统,以便我的实验室创建案例。这是为了防止使用重复的序列号。我已经在使用 Celery 进行打印,所以我想我会创建一个新的 Celery 队列并用它来处理这个案例。前端还需要获取案例创建的结果,以显示已创建的案例编号。

http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#rabbitmq

我按照上面的教程来配置我的 Celery。以下是出处:

celeryconfig.py:

from kombu import Queue
CELERY_DEFAULT_QUEUE = 'celery'
CELERY_DEFAULT_EXCHANGE = 'celery'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_RESULT_PERSISTENT = False

CELERY_QUEUES = (
    Queue('celery',    routing_key="celery"),
    Queue('case_creation',       routing_key='create.#')
)

CELERY_ROUTES = 
    'case.tasks.create_case': 
        'queue': 'case_creation',
        'routing_key': 'create.1'
    ,
    'print.tasks.connect_and_serve': 
        'queue': 'celery',
        'routing_key': 'celery'
    

celery.py:

import os

from celery import Celery

from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings.local')

app = Celery('proj', broker='amqp://guest@localhost//')

app.config_from_object('proj.celeryconfig')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

tasks.py:

import celery
from django.db import IntegrityError

from case.case_create import CaseCreate


@celery.task(bind=True)
def create_case(self, data, user, ip):
    try:
        acc = CaseCreate(data, user, ip)
        return acc.begin()
    except IntegrityError as e:
        self.retry(exc=e, countdown=2)

这是我调用上述任务的观点:

@require_authentication()
@requires_api_signature()
@csrf_exempt
@require_http_methods(['POST'])
def api_create_case(request):
    result = create_case.delay(json.loads(request.body.decode('utf-8')), request.user, get_ip_address(request))
    print(str(result))  # Prints the Task ID
    print(str(result.get(timeout=1)))  # Throws error
    return HttpResponse(json.dumps('result': str(result)), status=200)

我使用以下命令开始我的芹菜队列:

celery -A proj worker -Q case_creation -n case_worker -c 1

当我运行 celery worker 时,我确实看到结果显示在 config 下:

 -------------- celery@case_worker v3.1.16 (Cipater)
---- **** -----
--- * ***  * -- Windows-8-6.2.9200
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         proj:0x32a2990
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> case_creation    exchange=celery(direct) key=create.#

当我运行程序并提交新案例时,这是我收到的错误消息:

No result backend configured.  Please see the documentation for more information.

我已经尝试了所有可以在网上找到的东西。有没有人可以指出我正确的方向?我非常接近,非常厌倦看这段代码。

【问题讨论】:

【参考方案1】:

如果你想保留你的结果,试试这个Keeping Results

app = Celery('proj', backend='amqp', broker='amqp://guest@localhost//')

编辑

确保客户端配置了正确的后端。

如果由于某种原因客户端被配置为使用与工作线程不同的后端,您将无法接收结果,因此请通过检查确保后端正确:

试试这个看看输出:

>>> result = task.delay(…)
>>> print(result.backend)

其他解决方案将代替

app = Celery('proj',
         backend='amqp',
         broker='amqp://',
         include=['proj.tasks'])

试试:

app = Celery('proj',
             broker='amqp://',
             include=['proj.tasks'])
app.conf.update(
    CELERY_RESULT_BACKEND='amqp'
)

【讨论】:

我尝试了 amqp 并收到相同的错误消息,显示“未配置结果后端”。当我启动 celery 时,我看到结果配置已更新为 amqp,但它仍然无法正常工作。 请注意,自 5.0 版以来 amqp 结果后端已被删除

以上是关于在 Django 中使用 Celery 设置结果后端(rpc)的主要内容,如果未能解决你的问题,请参考以下文章

celery 设置多少时间后运行

在 Django Celery 结果中使用临时文件

Django Celery 结果将任务 ID 设置为人类可读的内容?

如何在 Django 视图中使用 celery 存储延迟调用的结果?

在使用 django_celery_beat 设置的 Django 视图中使用 Celery 定期任务输出,并使用 Redis 设置缓存

Django Celery Beat 和任务结果