在Django中测试django-rq(python-rq)的最佳实践

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在Django中测试django-rq(python-rq)的最佳实践相关的知识,希望对你有一定的参考价值。

我将在我的项目中开始使用django-rq。

Django与基于Redis的Python排队库RQ集成。

测试使用RQ的django应用程序的最佳做法是什么?

例如,如果我想将我的应用程序测试为黑盒子,那么在用户执行某些操作后,我想执行当前队列中的所有作业,然后检查我的数据库中的所有结果。我怎么能在我的django测试中做到这一点?

答案

我刚刚找到了django-rq,它允许你在一个测试环境中启动一个worker来执行队列中的任何任务然后退出。

from django.test impor TestCase
from django_rq import get_worker

class MyTest(TestCase):
    def test_something_that_creates_jobs(self):
        ...                      # Stuff that init jobs.
        get_worker().work(burst=True)  # Processes all jobs then stop.
        ...                      # Asserts that the job stuff is done.
另一答案

我把我的rq测试分成几块。

  1. 测试我是否正确地将东西添加到队列中(使用模拟)。
  2. 假设如果某些东西被添加到队列中,它最终将被处理。 (rq的测试套件应该涵盖这个)。
  3. 测试,如果输入正确,我的任务按预期工作。 (正常代码测试)。

正在测试的代码:

def handle(self, *args, **options):
    uid = options.get('user_id')

    # @@@ Need to exclude out users who have gotten an email within $window
    # days.
    if uid is None:
        uids = User.objects.filter(is_active=True, userprofile__waitlisted=False).values_list('id', flat=True)
    else:
        uids = [uid]

    q = rq.Queue(connection=redis.Redis())

    for user_id in uids:
        q.enqueue(mail_user, user_id)

我的测试:

class DjangoMailUsersTest(DjangoTestCase):
    def setUp(self):
        self.cmd = MailUserCommand()

    @patch('redis.Redis')
    @patch('rq.Queue')
    def test_no_userid_queues_all_userids(self, queue, _):
        u1 = UserF.create(userprofile__waitlisted=False)
        u2 = UserF.create(userprofile__waitlisted=False)
        self.cmd.handle()
        self.assertItemsEqual(queue.return_value.enqueue.mock_calls,
                              [call(ANY, u1.pk), call(ANY, u2.pk)])

    @patch('redis.Redis')
    @patch('rq.Queue')
    def test_waitlisted_people_excluded(self, queue, _):
        u1 = UserF.create(userprofile__waitlisted=False)
        UserF.create(userprofile__waitlisted=True)
        self.cmd.handle()
        self.assertItemsEqual(queue.return_value.enqueue.mock_calls, [call(ANY, u1.pk)])
另一答案

我承诺使用qazxsw poi:

a patch

这应该可以更轻松地测试RQ作业。希望有所帮助!

另一答案

我为这个案例做的是检测我是否正在测试,并在测试期间使用fakeredis。最后,在测试本身中,我将redis worker任务排入同步模式:

首先,定义一个检测您是否正在测试的函数:

from django.test impor TestCase
from django_rq import get_queue

class MyTest(TestCase):
    def test_something_that_creates_jobs(self):
        queue = get_queue(async=False)
        queue.enqueue(func) # func will be executed right away
        # Test for job completion

然后在您使用redis排队任务的文件中,以这种方式管理队列。如果需要,您可以扩展get_queue以指定队列名称:

TESTING = len(sys.argv) > 1 and sys.argv[1] == 'test'

def am_testing():
    return TESTING

然后,像这样排队你的任务:

if am_testing():
    from fakeredis import FakeStrictRedis 
    from rq import Queue
    def get_queue():
        return Queue(connection=FakeStrictRedis())

else:
    import django_rq
    def get_queue():
        return django_rq.get_queue()

最后,在您的测试程序中,以同步模式运行您正在测试的任务,以便它在与测试相同的过程中运行。作为一种惯例,我首先清除了fakeredis队列,但我认为没必要,因为没有工人:

queue = get_queue()
queue.enqueue(task_mytask, arg1, arg2)

我的settings.py具有正常的django_redis设置,因此django_rq.getqueue()在部署时使用这些:

from rq import Queue
from fakeredis import FakeStrictRedis

FakeStrictRedis().flushall()
queue = Queue(async=False, connection=FakeStrictRedis())
queue.enqueue(task_mytask, arg1, arg2)
另一答案

当队列中仍有作业时,您需要暂停测试。为此,您可以检查RQ_QUEUES = 'default': 'HOST': env_var('REDIS_HOST'), 'PORT': 6379, 'DB': 0, # 'PASSWORD': 'some-password', 'DEFAULT_TIMEOUT': 360, , 'high': 'HOST': env_var('REDIS_HOST'), 'PORT': 6379, 'DB': 0, 'DEFAULT_TIMEOUT': 500, , 'low': 'HOST': env_var('REDIS_HOST'), 'PORT': 6379, 'DB': 0, ,并在队列中仍有作业时暂停执行:

Queue.is_empty()
另一答案

以防这对任何人都有帮助。我使用了一个带有自定义模拟对象的补丁来执行可以立即运行的入队

import time
from django.utils.unittest import TestCase
import django_rq

class TestQueue(TestCase):

def test_something(self):
    # simulate some User actions which will queue up some tasks

    # Wait for the queued tasks to run
    queue = django_rq.get_queue('default')
    while not queue.is_empty():
        time.sleep(5) # adjust this depending on how long your tasks take to execute

    # queued tasks are done, check state of the DB
    self.assert(.....)

然后模拟对象只有一个方法:

#patch django_rq.get_queue
with patch('django_rq.get_queue', return_value=MockBulkJobGetQueue()) as mock_django_rq_get_queue:
    #Perform web operation that starts job. In my case a post to a url
另一答案

我遇到了同样的问题。另外,我在我的工作中执行了例如一些邮件功能,然后想要检查Django测试邮箱,如果有任何电子邮件。但是,由于使用Django RQ,作业不会在与Django测试相同的上下文中执行,因此发送的电子邮件不会最终出现在测试邮箱中。

因此,我需要在相同的上下文中执行作业。这可以通过以下方式实现:

class MockBulkJobGetQueue(object):

    def enqueue(self, f, *args, **kwargs):
        # Call the function
        f(
            **kwargs.pop('kwargs', None)
        )

在这里,您只需从队列中获取所有作业,并在测试中直接执行它们。可以在TestCase类中很好地实现它并重用此功能。

以上是关于在Django中测试django-rq(python-rq)的最佳实践的主要内容,如果未能解决你的问题,请参考以下文章

自定义 Django 信号不起作用

Pytho之Django

ImportError: Couldn't import Django. Are you sure it's installed and available on your PYTHO

当多个工人完成一个未知大小的工作时如何得到通知?

错误:SMTPRecipientsRefused 553, '5.7.1 #while 在 django 中处理联系表单

如何在 django 模板中调用带有参数的 python 函数? [复制]