Celery 任务计划(确保一个任务一次只执行一个)

Posted

技术标签:

【中文标题】Celery 任务计划(确保一个任务一次只执行一个)【英文标题】:Celery task schedule (Ensuring a task is only executed one at a time) 【发布时间】:2012-08-13 18:00:35 【问题描述】:

我有一个任务,有点像这样:

@task()
def async_work(info):
    ...

在任何时候,我都可以调用 async_work 并提供一些信息。出于某种原因,我需要确保一次只运行一个 async_work,其他调用请求必须等待。

所以我想出了以下代码:

is_locked = False    
@task()
def async_work(info):
    while is_locked:
        pass
    is_locked = True
    ...
    is_locked = False

但是它说访问局部变量是无效的... 如何解决?

【问题讨论】:

你能发布确切的错误信息吗? 【参考方案1】:

您可能不想为您的 celery 工人使用 concurrency=1 - 您希望同时处理您的任务。相反,您可以使用某种锁定机制。 只需确保缓存超时大于完成任务的时间。

Redis

import redis
from contextlib import contextmanager

redis_client = redis.Redis(host='localhost', port=6378)


@contextmanager
def redis_lock(lock_name):
    """Yield 1 if specified lock_name is not already set in redis. Otherwise returns 0.

    Enables sort of lock functionality.
    """
    status = redis_client.set(lock_name, 'lock', nx=True)
    try:
        yield status
    finally:
        redis_client.delete(lock_name)


@task()
def async_work(info):
    with redis_lock('my_lock_name') as acquired:
        do_some_work()

内存缓存

受celery documentation启发的示例

from contextlib import contextmanager
from django.core.cache import cache

@contextmanager
def memcache_lock(lock_name):
    status = cache.add(lock_name, 'lock')
    try:
        yield status
    finally:
        cache.delete(lock_name)


@task()
def async_work(info):
    with memcache_lock('my_lock_name') as acquired:
        do_some_work() 

【讨论】:

这个答案可以改进——例如,我们如何确保超时更大?不知道如何做到这一点的人只需要创建另一个问题。 确保只有在获得缓存锁时才删除它,否则其他所有任务都会执行(因为它们会来回添加和删除锁)。【参考方案2】:

我已经实现了一个装饰器来处理这个问题。它基于 Celery 官方文档中的 Ensuring a task is only executed one at a time。

它使用函数的名称和它的 args 和 kwargs 来创建一个 lock_id,它在 Django 的缓存层中设置/获取(我只用 Memcached 测试过这个,但它也应该适用于 Redis)。如果 lock_id 已经设置在缓存中,它会将任务放回队列并退出。

CACHE_LOCK_EXPIRE = 30


def no_simultaneous_execution(f):
    """
    Decorator that prevents a task form being executed with the
    same *args and **kwargs more than one at a time.
    """
    @functools.wraps(f)
    def wrapper(self, *args, **kwargs):
        # Create lock_id used as cache key
        lock_id = '--'.format(self.name, args, kwargs)

        # Timeout with a small diff, so we'll leave the lock delete
        # to the cache if it's close to being auto-removed/expired
        timeout_at = monotonic() + CACHE_LOCK_EXPIRE - 3

        # Try to acquire a lock, or put task back on queue
        lock_acquired = cache.add(lock_id, True, CACHE_LOCK_EXPIRE)
        if not lock_acquired:
            self.apply_async(args=args, kwargs=kwargs, countdown=3)
            return

        try:
            f(self, *args, **kwargs)
        finally:
            # Release the lock
            if monotonic() < timeout_at:
                cache.delete(lock_id)
    return wrapper

然后您可以将它作为第一个装饰器应用于任何任务:

@shared_task(bind=True, base=MyTask)
@no_simultaneous_execution
def sometask(self, some_arg):
  ...

【讨论】:

【参考方案3】:

访问局部变量是无效的,因为您可以让多个 celery worker 运行任务。这些工人甚至可能在不同的主机上。因此,基本上,is_locked 变量实例的数量与 Celery 工作人员正在运行的数量一样多 你的async_work 任务。因此,即使您的代码不会引发任何错误,您也不会获得预期的效果。

要实现您的目标,您需要将 Celery 配置为仅运行一名工作人员。由于任何工作人员都可以在任何给定时间处理单个任务,因此您可以获得所需的内容。

编辑:

根据Workers Guide > Concurrency:

默认情况下,多处理用于执行并发执行 任务,但您也可以使用 Eventlet。工人数 可以使用--concurrency 参数更改进程/线程 并默认为机器上可用的 CPU 数量。

因此你需要像这样运行worker:

$ celery worker --concurrency=1

编辑 2:

令人惊讶的是还有另一种解决方案,而且它甚至在官方文档中,参见Ensuring a task is only executed one at a time 文章。

【讨论】:

您能更明确地解释一下吗?如何配置 Celery 只运行一个工人? "$ celery worker --concurrency=1"效果很好!真的谢谢。 你不是通过强制并发为 1 来阻塞整个应用程序吗?

以上是关于Celery 任务计划(确保一个任务一次只执行一个)的主要内容,如果未能解决你的问题,请参考以下文章

防止 Celery Beat 运行相同的任务

如何确保 Celery 任务是防止重叠的 Celery 任务执行

Celery Beat:一次限制为单个任务实例

Celery-定时任务

Celery 为每个任务创建一个新连接

【celery】任务重复执行