防止 Celery Beat 运行相同的任务

Posted

技术标签:

【中文标题】防止 Celery Beat 运行相同的任务【英文标题】:Prevent Celery Beat from running the same task 【发布时间】:2015-10-22 05:38:44 【问题描述】:

我有一个计划的 celery 每 30 秒运行一次任务。我有一个每天作为任务运行,另一个每周在用户指定的时间和星期几运行。它检查“开始时间”和“下一个预定日期”。在任务完成之前,下一个计划日期不会更新。

但是,我想知道如何确保 celery beat 只运行一次任务。我现在看到,celery 将多次运行某个任务,直到该任务的下一个计划日期被更新。

【问题讨论】:

【参考方案1】:

为了做到这一点,您需要实现某种“分布式锁”,解决这个问题的一种简单可靠的方法是使用带有 memcached 后端的 django 缓存,并在任务开始时在其中设置一个“标志”就在它完成删除该标志之前。其他选择是使用“redis”锁作为“分布式锁”。 使用django缓存memcached作为后端的例子:

@shared_task
def my_task(arg1, arg2, lock_expire=300):
    lock_key = 'my_task'
    acquire_lock = lambda: cache.add(lock_key, '1', lock_expire)
    release_lock = lambda: cache.delete(lock_key)

    if acquire_lock():
        try:
            # Execute your code here!
        except Exception:
            # error handling here
        finally:
            # release allow other task to execute
            release_lock()
    else:
        print("Other task is running, skipping")

上面的代码实现了一个“分布式锁”,以确保无论您尝试再次执行多少次,都只能运行一个任务。 锁只能由一个任务获取:),另一个将跳过“主块”并完成。 你觉得有意义吗?

玩得开心!

【讨论】:

我试过了,但我的任务仍在重复。我还从我的 celery 日志中看到了这条消息:“ WARNING/Worker-3] /usr/lib/python2.7/site-packages/django/core/cache/backends/base.py:224: CacheKeyWarning: Cache key contains characters如果与 memcached 一起使用会导致错误:u':1:tasks object' CacheKeyWarning)" 你用什么作为“钥匙”?您似乎正在尝试使用任务本身。在这种情况下,事情可能会出错。现在很好,您应该使用我之前指出的“字符串”,例如“my_only_one_task”。谢谢! 嗨@mar​​tin-alderete,我对字符串应该做什么感到困惑。字符串应该与什么相关联?谢谢! 嘿@shizznetz,我刚刚阅读了这条评论。根据您的要求,字符串“lock_key”应该是您的任务的标识符。例如,如果您想一次只为给定用户执行一项任务,在这种情况下,您可以使用 lock_key,如下所示: lock_key = 'my_task:0'.format(user.id) 希望您明白这一点!提前致谢!

以上是关于防止 Celery Beat 运行相同的任务的主要内容,如果未能解决你的问题,请参考以下文章

如何防止芹菜执行相同的任务?

Celery Beat:一次限制为单个任务实例

使用 celery-beat 完成上一个任务后,如何在 5 分钟内运行任务?

在 Heroku 上运行的 Celery beat 进程发送任务两次

使用 Flask 动态调度 Celery Beat 任务

根据某些模型日期字段运行 Django Celery Beat 任务