你如何对 Celery 任务进行单元测试?
Posted
技术标签:
【中文标题】你如何对 Celery 任务进行单元测试?【英文标题】:How do you unit test a Celery task? 【发布时间】:2012-08-18 04:53:30 【问题描述】:Celery 文档 mentions testing Celery within Django 但没有说明如果您不使用 Django,如何测试 Celery 任务。你是怎么做到的?
【问题讨论】:
【参考方案1】:可以使用任何 unittest 库同步测试任务。在处理 celery 任务时,我通常会进行 2 次不同的测试。第一个(正如我在下面建议的那样)是完全同步的,应该是确保算法做它应该做的那个。第二个会话使用整个系统(包括代理)并确保我没有序列化问题或任何其他分发、通信问题。
所以:
from celery import Celery
celery = Celery()
@celery.task
def add(x, y):
return x + y
你的测试:
from nose.tools import eq_
def test_add_task():
rst = add.apply(args=(4, 4)).get()
eq_(rst, 8)
希望有帮助!
【讨论】:
除了在使用 HttpDispatchTask 的任务上有效 - docs.celeryproject.org/en/latest/userguide/remote-tasks.html 我必须设置 celery.conf.CELERY_ALWAYS_EAGER = True 但即使同时设置 celery.conf.CELERY_IMPORTS = ('celery.task. http') 测试失败,NotRegistered: celery.task.http.HttpDispatchTask 奇怪,你确定你没有遇到一些导入问题吗?这个test 有效(请注意,我在伪造响应,所以它返回 celery 所期望的)。此外,CELERY_IMPORTS 中定义的模块将在workers initialization 期间导入,为了避免这种情况,我建议您致电celery.loader.import_default_modules()
。
我也建议你看看here。它模拟了http请求。不知道它是否有帮助,我猜你想测试一个已经启动并运行的服务,不是吗?【参考方案2】:
更新我七岁的答案:
您可以通过 pytest 夹具在单独的线程中运行工作人员:
https://docs.celeryproject.org/en/stable/userguide/testing.html#celery-worker-embed-live-worker
根据文档,您不应使用“always_eager”(请参阅上述链接的页面顶部)。
旧答案:
我用这个:
with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
...
文档:http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager
CELERY_ALWAYS_EAGER 让您可以同步运行任务,并且您不需要 celery 服务器。
【讨论】:
我认为这已经过时了 - 我得到了ImportError: No module named celeryconfig
。
我相信上面假设模块 celeryconfig.py
存在于一个包中。见docs.celeryproject.org/en/latest/getting-started/…。
我知道它已经过时了,但您能否提供一个完整示例,说明如何在TestCase
类中从 OP 的问题中启动任务 add
?
@MaxChrétien 抱歉,我无法提供完整的示例,因为我不再使用 celery。如果您有足够的声望点,您可以编辑我的问题。如果你没有足够的,那么请让我知道我应该复制+粘贴到这个答案中。
@miken32 谢谢。由于最近的答案以某种方式解决了我想帮助解决的问题,我只是留下了一条评论,即 4.0 的官方文档不鼓励使用 CELERY_TASK_ALWAYS_EAGER
进行单元测试。【参考方案3】:
取决于您要测试的具体内容。
直接测试任务代码。不要调用“task.delay(...)”,只需从单元测试中调用“task(...)”。 使用CELERY_ALWAYS_EAGER。这将导致您的任务在您说“task.delay(...)”时立即被调用,因此您可以测试整个路径(但不能测试任何异步行为)。【讨论】:
【参考方案4】:单元测试
import unittest
from myproject.myapp import celeryapp
class TestMyCeleryWorker(unittest.TestCase):
def setUp(self):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
py.test 夹具
# conftest.py
from myproject.myapp import celeryapp
@pytest.fixture(scope='module')
def celery_app(request):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
return celeryapp
# test_tasks.py
def test_some_task(celery_app):
...
附录:让 send_task 尊重热切
from celery import current_app
def send_task(name, args=(), kwargs=, **opts):
# https://github.com/celery/celery/issues/581
task = current_app.tasks[name]
return task.apply(args, kwargs, **opts)
current_app.send_task = send_task
【讨论】:
【参考方案5】:对于那些使用 Celery 4 的人来说:
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
由于设置名称已更改,如果您选择升级需要更新,请参阅
https://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html?highlight=what%20is%20new#lowercase-setting-names
【讨论】:
根据official docs,使用“task_always_eager”(之前的“CELERY_ALWAYS_EAGER”)不适合单元测试。相反,他们提出了一些其他很好的方法来对 Celery 应用程序进行单元测试。 我只是补充一点,您不希望在单元测试中执行急切任务的原因是因为您没有进行测试,例如一旦您在生产中使用代码,将发生的参数序列化。【参考方案6】:从 Celery 3.0 开始,在 Django 中设置 CELERY_ALWAYS_EAGER
的一种方法是:
from django.test import TestCase, override_settings
from .foo import foo_celery_task
class MyTest(TestCase):
@override_settings(CELERY_ALWAYS_EAGER=True)
def test_foo(self):
self.assertTrue(foo_celery_task.delay())
【讨论】:
当 celery 任务在函数内部时,它似乎不起作用。【参考方案7】:reference 使用 pytest。
def test_add(celery_worker):
mytask.delay()
如果你使用flask,请设置应用配置
CELERY_BROKER_URL = 'memory://'
CELERY_RESULT_BACKEND = 'cache+memory://'
在conftest.py
@pytest.fixture
def app():
yield app # Your actual Flask application
@pytest.fixture
def celery_app(app):
from celery.contrib.testing import tasks # need it
yield celery_app # Your actual Flask-Celery application
【讨论】:
【参考方案8】:自 Celery v4.0 起,py.test 固定装置是 provided 来启动一个 celery worker 仅用于测试并在完成后关闭:
def test_myfunc_is_executed(celery_session_worker):
# celery_session_worker: <Worker: gen93553@mymachine.local (running)>
assert myfunc.delay().wait(3)
在http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test 中描述的其他固定装置中,您可以通过重新定义celery_config
固定装置来更改 celery 默认选项:
@pytest.fixture(scope='session')
def celery_config():
return
'accept_content': ['json', 'pickle'],
'result_serializer': 'pickle',
默认情况下,测试工作者使用内存中的代理和结果后端。如果不测试特定功能,则无需使用本地 Redis 或 RabbitMQ。
【讨论】:
亲爱的投票者,您想分享一下为什么这是一个糟糕的答案吗?衷心感谢。 对我不起作用,测试套件只是挂起。你能提供更多的上下文吗? (不过我还没有投票;))。 在我的情况下,我必须明确设置 celey_config 夹具以使用内存代理和缓存+内存后端【参考方案9】:就我而言(我假设还有很多其他情况),我只想使用 pytest 测试任务的内部逻辑。
TL;DR; 最终嘲笑一切(OPTION 2)
示例用例:
proj/tasks.py
@shared_task(bind=True)
def add_task(self, a, b):
return a+b;
tests/test_tasks.py
from proj import add_task
def test_add():
assert add_task(1, 2) == 3, '1 + 2 should equal 3'
但是,由于 shared_task
装饰器做了很多 celery 内部逻辑,它并不是真正的单元测试。
所以,对我来说,有两个选择:
选项 1:分离内部逻辑
proj/tasks_logic.py
def internal_add(a, b):
return a + b;
proj/tasks.py
from .tasks_logic import internal_add
@shared_task(bind=True)
def add_task(self, a, b):
return internal_add(a, b);
这看起来很奇怪,除了使其可读性降低之外,它还需要手动提取和传递请求中的属性,例如 task_id
以备不时之需,这使得逻辑不那么纯粹。
选项 2:模拟 嘲笑芹菜内脏
tests/__init__.py
# noinspection PyUnresolvedReferences
from celery import shared_task
from mock import patch
def mock_signature(**kwargs):
return
def mocked_shared_task(*decorator_args, **decorator_kwargs):
def mocked_shared_decorator(func):
func.signature = func.si = func.s = mock_signature
return func
return mocked_shared_decorator
patch('celery.shared_task', mocked_shared_task).start()
然后允许我模拟请求对象(同样,如果您需要请求中的内容,例如 id 或重试计数器。
tests/test_tasks.py
from proj import add_task
class MockedRequest:
def __init__(self, id=None):
self.id = id or 1
class MockedTask:
def __init__(self, id=None):
self.request = MockedRequest(id=id)
def test_add():
mocked_task = MockedTask(id=3)
assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'
此解决方案更加手动,但它为我提供了实际单元测试所需的控制,无需重复自己,也不会丢失 celery 范围。
【讨论】:
【参考方案10】:我在单元测试方法中看到了很多 CELERY_ALWAYS_EAGER = true
作为单元测试的解决方案,但是由于版本 5.0.5 可用,因此有很多更改使得大多数旧答案被弃用,对我来说有一段时间废话,所以对于在这里搜索解决方案的每个人,请转到文档并阅读新版本的记录良好的单元测试示例:
https://docs.celeryproject.org/en/stable/userguide/testing.html
对于带有单元测试的 Eager 模式,这里引用了实际文档:
渴望模式
task_always_eager 设置启用的渴望模式是 定义不适合单元测试。
在使用 Eager 模式进行测试时,您只是在测试模拟什么 发生在工人身上,并且两者之间存在许多差异 仿真和现实中发生的事情。
【讨论】:
文档似乎只适用于 pytest,而不是 unittest,这是 django 的默认设置。如果他们有一些使用标准 django 测试设置的示例,那就太酷了。以上是关于你如何对 Celery 任务进行单元测试?的主要内容,如果未能解决你的问题,请参考以下文章
在 Django 单元测试中使用 mock 修补 celery 任务