Celery/Redis 相同的任务被并行执行多次

Posted

技术标签:

【中文标题】Celery/Redis 相同的任务被并行执行多次【英文标题】:Celery/Redis same task being executed multiple times in parallel 【发布时间】:2014-09-14 18:04:48 【问题描述】:

我有 2 个自定义任务(TaskATaskB),都继承自 celery.Task。调度程序时不时地启动TaskATaskA 每次启动NTaskB 时使用不同的参数。但由于某种原因,有时相同的TaskB 具有相同的参数,会同时执行两次,这会导致数据库出现不同的问题。

class TaskA(celery.Task):

    def run(self, *args, **kwargs):
        objects = MyModel.objects.filter(processed=False)\
                                 .values_list('id', flat=True)
        task_b = TaskB()
        for o in objects:
            o.apply_async(args=[o, ])

class TaskB(celery.Task):

    def run(self, obj_id, *args, **kwargs):
        obj = MyModel.objects.get(id=obj_id)
        # do some stuff with obj

我尝试过的事情

我尝试使用celery.group,希望它能解决此类问题,但我得到的只是错误,说run 需要2 个参数,但没有提供任何参数。

这就是我尝试使用celery.group 启动TaskB 的方式:

# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.s(id) for id in objects])
g.apply_async()

我也试过这样:

# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.run(id) for id in objects])
g.apply_async()

g.apply_async()之前执行任务。

问题

问题是来自我如何启动任务还是其他原因?这是正常行为吗?

其他信息

在我的本地机器上,我使用RabbitMQ 3.3.4 运行celery 3.1.13,在服务器celery 3.1.13 上使用Redis 2.8.9 运行。 在本地机器上我看不到这样的行为,每个任务都执行一次。在服务器上,我看到 1 到 10 个这样的任务连续执行两次。

这就是我在本地机器和服务器上运行 celery 的方式:

celery_beat: celery -A proj beat -l info

celery1: celery -A proj worker -Q default -l info --purge -n default_worker -P eventlet -c 50

celery2: celery -A proj worker -Q long -l info --purge -n long_worker -P eventlet -c 200

可行的解决方法

我根据收到的参数对TaskB 进行了锁定。经过大约 10 个小时的测试,我看到究竟是什么被执行了两次,但是锁可以防止数据库发生冲突。 这确实解决了我的问题,但我仍然想了解为什么会这样。

【问题讨论】:

您的代码应该可以正常工作。我已经复制了your code to one file like this,并且在调用TaskA().apply_async() 后所有任务都已执行。您能否发布回溯以查看问题出在哪里? 回溯来自数据库。 MyModel 对 2 个字段具有唯一约束。因此,当任务第一次运行并创建一个新对象时,一切都很好,但随后再次运行相同的任务,并尝试再次创建相同的对象并抛出IntegrityError 使用您发布的代码无法复制您的问题。我认为您可以尝试为每个任务创建单独的 TaskB 实例,因为这可能是一个问题。试试看:g = celery.group([TaskB().s(id) for id in objects]) 我也尝试过,结果相同。我发布的代码非常接近我的实际任务。我将切换到在本地机器上的 redis 上运行 celery,也许这会给我更好的调试信息。 您可能想观看此演示文稿youtu.be/3cyq5DHjymw?t=24m20s 【参考方案1】:

您是否按照 Celery 的 Using Redis 文档中的说明设置了 fanout_prefixfanout_patterns?我正在将 Celery 与 Redis 一起使用,但我没有遇到这个问题。

【讨论】:

我遇到了同样的问题,一个任务排队其他任务,而其他任务被多次执行。如上所述设置fanout_prefixfanout_patterns 似乎已经解决了这个问题。使用 Celery 3.1.18 和 Kombu 3.0.30

以上是关于Celery/Redis 相同的任务被并行执行多次的主要内容,如果未能解决你的问题,请参考以下文章

Celery+python+redis异步执行定时任务

xadmin引入celery执行异步任务与定时任务

带有芹菜的Django:计划任务(ETA)并行执行多次

django+celery+redis环境配置

Flask+Celery+Redis实现队列化异步任务

我在使用 Celery、Redis 和 Django 时遇到问题