Celery 任务在 Django 测试中没有抛出异常

Posted

技术标签:

【中文标题】Celery 任务在 Django 测试中没有抛出异常【英文标题】:Celery tasks not throwing exception in Django Tests 【发布时间】:2015-01-17 17:35:07 【问题描述】:

我的 Django 测试中包含了几个 celery 任务。不幸的是,通过 .delay() 调用任务时不会抛出异常。我将 CELERY_ALWAYS_EAGER 设置为 True。

tasks.py

import celeryapp as app

@app.task()
def exception_task():
    print 'CELERY_ALWAYS_EAGER:', app.conf['CELERY_ALWAYS_EAGER']
    raise Exception('foo')

tests.py

def test_exception_in_task(self):
        from tasks import exception_task
        exception_task.delay()

输出

CELERY_ALWAYS_EAGER: True
.
----------------------------------------------------------------------
Ran 1 test in 0.686s

当删除 .delay 时,测试退出并出现预期的错误:

ERROR: test_exception_in_task
Exception: foo

版本

celery==3.1.4
Django==1.6.4

【问题讨论】:

【参考方案1】:

似乎我还必须将 CELERY_EAGER_PROPAGATES_EXCEPTIONS 设置为 True。

【讨论】:

这仍然适用于 Celery 5。【参考方案2】:

在celery 4.0下,我不得不使用CELERY_TASK_EAGER_PROPAGATES

【讨论】:

以上是关于Celery 任务在 Django 测试中没有抛出异常的主要内容,如果未能解决你的问题,请参考以下文章

在 Django 单元测试中使用 mock 修补 celery 任务

从 Celery 任务向 Channels 发送消息

在django中使用celery异步任务和定时任务

你如何对 Celery 任务进行单元测试?

python测试开发django-197.django-celery-beat 定时任务

python测试开发django-160.Celery 定时任务 (beat)