使用 django、celery 和 redis 的一项一项任务

Posted

技术标签:

【中文标题】使用 django、celery 和 redis 的一项一项任务【英文标题】:One by one task using django, celery and redis 【发布时间】:2020-08-30 19:36:09 【问题描述】:

我使用 django、celery 和 redis 异步启动任务。

# tasks.py
@shared_task
def my_task():
    # Do stuff of task 1
    return True
# Somewhere in test1.py
my_task.delay()
# Few milli seconds later in test2.py
my_task.delay()

使用该配置,my_task 在 2 个不同的文件上启动 2 次。所以它们几乎同时在不同的线程上执行。

我需要将这 2 个任务一一执行。如果 my_task #1 正在执行并且另一个 my_task #2 已启动,我需要 my_task #2 等待 #1 结束后再执行。

我不想只使用一个线程将参数传递给 celery celery worker --concurrency=1

我的 settings.py 中的 celery 配置是基本的:

# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'

我发现很多资源都在谈论这个主题,但我真的不明白如何实现我的目标

https://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html#ensuring-a-task-is-only-executed-one-at-a-time Running "unique" tasks with celery http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html

【问题讨论】:

【参考方案1】:

http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html 上的解决方案几乎奏效了。这是一些改编:

import redis

REDIS_CLIENT = redis.Redis()

def only_one(function=None, key="", timeout=None):
    """Enforce only one celery task at a time."""

    def _dec(run_func):
        """Decorator."""

        def _caller(*args, **kwargs):
            """Caller."""
            ret_value = None
            have_lock = False
            lock = REDIS_CLIENT.lock(key, timeout=timeout)
            try:
                have_lock = lock.acquire(blocking=True)
                if have_lock:
                    ret_value = run_func(*args, **kwargs)
            finally:
                if have_lock:
                    lock.release()

            return ret_value

        return _caller

    return _dec(function) if function is not None else _dec
@task(name='my_app.sample.tasks.single_task')
@only_one(key="SingleTask", timeout=60 * 5)
def single_task(self, **kwargs):
    """Run task."""
    print("test")

问题是,我没有在我的 settings.py 中的任何地方配置 Redis,所以我不明白它是如何找到正确的 redis 数据库的。我猜它来自 celery 的配置。

【讨论】:

以上是关于使用 django、celery 和 redis 的一项一项任务的主要内容,如果未能解决你的问题,请参考以下文章

使用 django、celery 和 redis 的一项一项任务

Django - 如何在 celery 和 redis 中使用异步任务队列

django redis celery 和 celery beats 的正确设置

Celery 与 Django 中的 Redis 代理:任务成功执行,但仍有太多持久的 Redis 键和连接

Django使用Celery加redis执行异步任务

在使用 django_celery_beat 设置的 Django 视图中使用 Celery 定期任务输出,并使用 Redis 设置缓存