Celery period_task 并行运行多次

Posted

技术标签:

【中文标题】Celery period_task 并行运行多次【英文标题】:Celery periodic_task running multiple times in parallel 【发布时间】:2011-12-04 20:59:03 【问题描述】:

我有一些使用 Celery 线程的非常简单的周期性代码;它只是打印“Pre”和“Post”并在两者之间休眠。改编自this *** question和this linked website

from celery.task import task
from celery.task import periodic_task
from django.core.cache import cache
from time import sleep
import main
import cutout_score
from threading import Lock

import socket
from datetime import timedelta
from celery.decorators import task, periodic_task

def single_instance_task(timeout):
  def task_exc(func):
    def wrapper(*args, **kwargs):
        lock_id = "celery-single-instance-" + func.__name__
        acquire_lock = lambda: cache.add(lock_id, "true", timeout)
        release_lock = lambda: cache.delete(lock_id)
        if acquire_lock():
            try:
                func()
            finally:
                release_lock()
    return wrapper
  return task_exc

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
@periodic_task(run_every = timedelta(seconds=2))
def test():
    lock_id = "lock"

    # cache.add fails if if the key already exists
    acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
    # memcache delete is very slow, but we have to use it to take
    # advantage of using add() for atomic locking
    release_lock = lambda: cache.delete(lock_id)

    if acquire_lock():
        try:
            print 'pre'
            sleep(20)
            print 'post'
        finally:
            release_lock()
        return
    print 'already in use...'

此代码从不打印'already in use...';当我使用 @single_instance_task 装饰器时也会出现同样的现象。

你知道怎么回事吗?

编辑:我已经简化了问题,使其不会写入内存(使用全局缓存或 django 缓存);我还是没见过'already in use...'


编辑:当我将以下代码添加到我的 Django settings.py 文件中时(通过将代码从 https://docs.djangoproject.com/en/dev/topics/cache/ 更改为所希望的一切,但仅当我使用端口 11211 时(奇怪的是,我的服务器在 8000 端口)

CACHES = 
    'default': 
        'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
        'LOCATION': [
            '127.0.0.1:11211'
        ]
    

【问题讨论】:

【参考方案1】:

你是如何运行 celeryd 的?我不熟悉线程选项。

如果它正在运行多进程,则没有“全局”变量是工作人员之间的共享内存。

如果你想在所有工作人员之间共享一个计数器,那么我建议你使用cache.incr

例如:

In [1]: from django.core.cache import cache

In [2]: cache.set('counter',0)

In [3]: cache.incr('counter')
Out[3]: 1

In [4]: cache.incr('counter')
Out[4]: 2

更新

如果你通过睡眠来强制你的任务重叠会发生什么,例如:

print "Task on %r started" % (self,)
sleep(20)
print "Task on %r stopped" % (self,)

如果您在 20 秒内更频繁地运行此程序并没有得到“已经在使用...”,那么您就知道缓存没有按预期运行。


另一个更新

您是否在 django 设置中设置了缓存后端?例如。内存缓存

如果不是,您可能正在使用Dummy Cache,它实际上并没有做任何缓存,只是实现了接口...这听起来像是一个令人信服的问题原因。 p>

【讨论】:

+1 这听起来与我的问题有关。我尝试使用缓存,但仍然看到 counter 的不稳定值。另外,我看到多个工作人员进入了test 函数。我正在用 django 运行 celeryd:python manage.py celeryd -v 2 -B -s celery -E -l INFO 即使我简化了 test 函数只打印“hello”,它在不同的工作人员上运行并且打印过于频繁(即使我定义了 @single_instance_task 装饰器)。跨度> 我已经简化了代码(上面),以便它只打印(如你所建议的那样)。它仍然从不打印'already in use...';不知何故,缓存没有成功锁定。 您使用的是哪个缓存后端?我在 memcached 中成功使用了这个配方。 啊——我愿意from django.core.cache import cache;那是来自 ask.github.com/celery/cookbook/tasks.html 的那个。此外,允许 celery 并发 >1 也很重要。 concurrency = 1 时,它永远不会产生错误,但永远不会打印 'already in use...'

以上是关于Celery period_task 并行运行多次的主要内容,如果未能解决你的问题,请参考以下文章

Celery/Redis 相同的任务被并行执行多次

带有芹菜的Django:计划任务(ETA)并行执行多次

Python 并行分布式框架 Celery

Python 并行分布式框架 Celery

防止 Celery Beat 运行相同的任务

[源码解析] 并行分布式框架 Celery 之 Lamport 逻辑时钟 & Mingle