如何在芹菜中将任务从一个队列移动到另一个队列

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在芹菜中将任务从一个队列移动到另一个队列相关的知识,希望对你有一定的参考价值。

我需要能够做两件事,这两件事取决于我能够将任务从一个队列移动到另一个队列:

  1. 当任务失败一定数量的重试时,将其移动到另一个队列
  2. 当任务满足某些条件时,将其移动到不同的队列(将由不同的工作人员处理)

请注意,我的意思是将东西放在工作人员的一个任务中的另一个队列中 - 而不是从主应用程序中。

我能找到的唯一一段代码可以在这里引用 - https://stackoverflow.com/a/27144119/112050有人能指出正确的api来做到这一点吗?

答案

您移动任务的想法实际上归结为使用相同的参数运行相同的任务,但将其发送到不同的队列。

apply_async有一个queue参数

from celery.exceptions import MaxRetriesExceededError

@shared_task(default_retry_delay = 1 * 60, max_retries = 10)
def some_task(arg1, arg2):
    try:
        # task logic

        if some_condition:
            some_task.apply_async([arg1, arg2], queue='different_queue')
            return

    except MaxRetriesExceededError:
        some_task.apply_async([arg1, arg2], queue='different_queue')

    except Exception, exc:
        raise some_task.retry(exc=exc) 

如果您确实想在队列之间移动任务,假设您使用的是RabbitMQ,则可以使用Shovel Plugin。例如,将消息从队列q1移动到本地代理上的队列q2

rabbitmqctl set_parameter shovel my-shovel \
'{"src-uri": "amqp://", "src-queue": "q1", \
 "dest-uri": "amqp://", "dest-queue": "q2"}'

以上是关于如何在芹菜中将任务从一个队列移动到另一个队列的主要内容,如果未能解决你的问题,请参考以下文章

如何取消芹菜队列上的任务? [复制]

芹菜节拍队列包括过时的任务

芹菜 - 如何使用多个队列?

检索芹菜队列中的任务列表

如何防止芹菜执行相同的任务?

如何在不重复的情况下重试芹菜任务 - SQS