结束芹菜工人的任务、时间限制、工作阶段或客户的指示

Posted

技术标签:

【中文标题】结束芹菜工人的任务、时间限制、工作阶段或客户的指示【英文标题】:End Celery worker task on, time limit, job stage or instruction from client 【发布时间】:2012-03-20 08:13:23 【问题描述】:

我是 celery 新手,如果我还没有写过设计模式(或示例代码),我将不胜感激。

以下是对工人所需特征的描述。

worker 将运行一项任务,该任务从无穷无尽的来源(生成器)收集数据。 除非被指示停止,否则工作任务将永远从生成器中运行。

工作任务应在发生以下任一触发器时正常停止。

    它超过了以秒为单位的执行时间限制。 它超过了无限生成器循环的迭代次数。 客户端发送消息指示工作任务立即完成。

以下是一些 sudo 代码,说明我认为我需要如何处理触发场景 1 和 2。

我不知道我如何从客户端发送“立即完成”信号,以及它是如何在工作任务中接收和执行的。

任何建议或示例代码将不胜感激。

from celery.task import task
from celery.exceptions import SoftTimeLimitExceeded

COUNTLIMIT = # some value sent to the worker task by the client

@task()
def getData():
    try:
        for count, data in enumerate(endlessGeneratorThing()):
            # process data here
            if count > COUNTLIMIT: # Handle trigger scenario 2
                clean_up_task_nicely()
                break 
    except SoftTimeLimitExceeded:  # Handle trigger scenario 1
            clean_up_task_nicely()

【问题讨论】:

您可以通过撤销***.com/questions/8920643/…来终止正在执行的任务 【参考方案1】:

我对 revoke 的理解是它只在执行之前撤销任务。对于(3),我认为您想要做的是使用 AbortableTask,它提供了一种协作方式来结束任务:

http://docs.celeryproject.org/en/latest/reference/celery.contrib.abortable.html

在客户端你可以调用task.abort(),在任务端你可以轮询task.is_aborted()

【讨论】:

以上是关于结束芹菜工人的任务、时间限制、工作阶段或客户的指示的主要内容,如果未能解决你的问题,请参考以下文章

从芹菜任务中获取芹菜工人的名字?

芹菜工人并发

Celery Beat:一次限制为单个任务实例

芹菜工人的水平尺度导致相同的处理时间

芹菜工人在当前任务完成后不会再接新任务

有没有办法非暴力地停止芹菜工人的特定任务?