检索芹菜队列中的任务列表

Posted

技术标签:

【中文标题】检索芹菜队列中的任务列表【英文标题】:Retrieve list of tasks in a queue in Celery 【发布时间】:2011-07-29 12:20:16 【问题描述】:

如何检索队列中尚未处理的任务列表?

【问题讨论】:

RabbitMQ,但我想在 Python 中检索这个列表。 【参考方案1】:

编辑:查看其他答案以获取队列中的任务列表。

你应该看这里: Celery Guide - Inspecting Workers

基本上是这样的:

my_app = Celery(...)

# Inspect all nodes.
i = my_app.control.inspect()

# Show the items that have an ETA or are scheduled for later processing
i.scheduled()

# Show tasks that are currently active.
i.active()

# Show tasks that have been claimed by workers
i.reserved()

取决于你想要什么

【讨论】:

我试过了,但它真的很慢(比如 1 秒)。我在龙卷风应用程序中同步使用它来监控进度,所以它必须很快。 这不会返回队列中尚未处理的任务列表。 使用i.reserved() 获取排队任务列表。 在指定工人时,我必须使用列表作为参数:inspect(['celery@Flatty'])。与inspect() 相比,速度大幅提升。 这没有回答问题。我不知道为什么这个答案被接受...... :)【参考方案2】:

如果您使用的是 rabbitMQ,请在终端中使用:

sudo rabbitmqctl list_queues

它将打印带有待处理任务数量的队列列表。例如:

Listing queues ...
0b27d8c59fba4974893ec22d478a7093    0
0e0a2da9828a48bc86fe993b210d984f    0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7    0
15c036ad25884b82839495fb29bd6395    1
celerey_mail_worker@torob2.celery.pidbox    0
celery  166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa   0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6   0

右栏中的数字是队列中的任务数。在上面,celery 队列有 166 个待处理任务。

【讨论】:

当我拥有 sudo 权限时,我对此很熟悉,但我希望非特权系统用户能够检查 - 有什么建议吗? 此外,如果您想稍后处理该数字,例如统计数据,您可以通过grep -e "^celery\s" | cut -f2 管道提取该166【参考方案3】:

如果您使用 Celery+Django 最简单的方法来检查任务,直接在您的虚拟环境中使用终端中的命令或使用完整路径 芹菜:

文档:http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers

$ celery inspect reserved
$ celery inspect active
$ celery inspect registered
$ celery inspect scheduled

此外,如果您使用的是 Celery+RabbitMQ,您可以使用以下命令检查队列列表

更多信息:https://linux.die.net/man/1/rabbitmqctl

$ sudo rabbitmqctl list_queues

【讨论】:

如果你有定义项目,可以使用celery -A my_proj inspect reserved 这同样不能回答问题。【参考方案4】:

如果你不使用优先任务,如果你使用 Redis,这实际上是pretty simple。获取任务计数:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

但是,优先任务use a different key in redis,所以整体情况稍微复杂一些。完整的情况是您需要为任务的每个优先级查询 redis。在 python 中(以及来自 Flower 项目),这看起来像:

PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]


def make_queue_name_for_pri(queue, pri):
    """Make a queue name for redis

    Celery uses PRIORITY_SEP to separate different priorities of tasks into
    different queues in Redis. Each queue-priority combination becomes a key in
    redis with names like:

     - batch1\x06\x163 <-- P3 queue named batch1

    There's more information about this in Github, but it doesn't look like it 
    will change any time soon:

      - https://github.com/celery/kombu/issues/422

    In that ticket the code below, from the Flower project, is referenced:

      - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135

    :param queue: The name of the queue to make a name for.
    :param pri: The priority to make a name with.
    :return: A name for the queue-priority pair.
    """
    if pri not in DEFAULT_PRIORITY_STEPS:
        raise ValueError('Priority not in priority steps')
    return '012'.format(*((queue, PRIORITY_SEP, pri) if pri else
                                (queue, '', '')))


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
                      DEFAULT_PRIORITY_STEPS]
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return sum([r.llen(x) for x in priority_names])

如果你想得到一个实际的任务,你可以使用类似的东西:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1

您必须从那里反序列化返回的列表。就我而言,我能够通过以下方式完成此操作:

r = redis.StrictRedis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))

请注意,反序列化可能需要一点时间,您需要调整上面的命令以处理各种优先级。

【讨论】:

在生产中使用它后,我得知它fails if you use prioritized tasks,由于Celery的设计。 我已更新上述内容以处理优先任务。进步! 只是说明一下,默认使用的DATABASE_NUMBER0QUEUE_NAMEcelery,所以redis-cli -n 0 llen celery会返回排队消息的数量。跨度> 对于我的 celery,队列的名称是 '012' 而不是 '012'。除此之外,这非常有效! 它总是返回 0。【参考方案5】:

要从后端检索任务,请使用此

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)

【讨论】:

但是 'jobs' 只给出队列中的任务数 请参阅***.com/a/57807913/9843399 获取相关答案,为您提供任务名称。【参考方案6】:

带有 json 序列化的 Redis 复制粘贴解决方案:

def get_celery_queue_items(queue_name):
    import base64
    import json  

    # Get a configured instance of a celery app:
    from yourproject.celery import app as celery_app

    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
        decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        body = json.loads(base64.b64decode(j['body']))
        decoded_tasks.append(body)

    return decoded_tasks

它适用于 Django。只是不要忘记更改yourproject.celery

【讨论】:

如果您使用的是 pickle 序列化程序,则可以将 body = 行更改为 body = pickle.loads(base64.b64decode(j['body']))【参考方案7】:

这在我的应用程序中对我有用:

def get_celery_queue_active_jobs(queue_name):
    connection = <CELERY_APP_INSTANCE>.connection()

    try:
        channel = connection.channel()
        name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
        active_jobs = []

        def dump_message(message):
            active_jobs.append(message.properties['application_headers']['task'])

        channel.basic_consume(queue=queue_name, callback=dump_message)

        for job in range(jobs):
            connection.drain_events()

        return active_jobs
    finally:
        connection.close()

active_jobs 将是与队列中的任务相对应的字符串列表。

不要忘记将 CELERY_APP_INSTANCE 换成你自己的。

感谢@ashish 在这里的回答为我指明了正确的方向:https://***.com/a/19465670/9843399

【讨论】:

在我的情况下,jobs 始终为零...有什么想法吗? @daveoncode 我认为这些信息不足以让我做出有益的回应。你可以打开你自己的问题。如果您指定要在 python 中检索信息,我认为它不会与此重复。我会回到***.com/a/19465670/9843399,这是我的回答所依据的,并确保它首先有效。 @CalebSyring 这是第一个真正向我展示排队任务的方法。很不错。对我来说唯一的问题是列表附加似乎不起作用。有什么想法可以让回调函数写入列表吗? @Varlor 对不起,有人对我的回答进行了不当编辑。您可以查看原始答案的编辑历史记录,这很可能对您有用。我正在努力解决这个问题。 (编辑:我刚进去并拒绝了编辑,它有一个明显的python错误。让我知道这是否解决了你的问题。) @CalebSyring 我现在在一个类中使用了你的代码,将列表作为类属性有效!【参考方案8】:

我认为获取正在等待的任务的唯一方法是保留您启动的任务列表,并让任务在启动时从列表中删除。

使用 rabbitmqctl 和 list_queues,您可以大致了解有多少任务正在等待,但不能了解任务本身:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

如果您想要的包括正在处理但尚未完成的任务,您可以保留一份任务列表并检查它们的状态:

from tasks import add
result = add.delay(4, 4)

result.ready() # True if finished

或者您让 Celery 使用 CELERY_RESULT_BACKEND 存储结果并检查您的哪些任务不在其中。

【讨论】:

【参考方案9】:

celery 检查模块似乎只从工人的角度了解任务。如果你想查看队列中的消息(尚未被worker拉取)我建议使用pyrabbit,它可以与rabbitmq http api接口从队列中检索各种信息。

可以在此处找到示例: Retrieve queue length with Celery (RabbitMQ, Django)

【讨论】:

【参考方案10】:

据我所知,Celery 没有提供用于检查队列中等待的任务的 API。这是特定于经纪人的。如果您使用 Redis 作为代理,则检查在 celery(默认)队列中等待的任务很简单:

    连接到代理 列出celery 列表中的项目(以LRANGE 命令为例)

请记住,这些是等待可用工作人员选择的任务。您的集群可能正在运行一些任务 - 这些任务不会在此列表中,因为它们已经被选中。

在特定队列中检索任务的过程是特定于代理的。

【讨论】:

【参考方案11】:

我得出的结论是,获取队列中作业数量的最佳方法是使用rabbitmqctl,正如这里多次建议的那样。为了允许任何选定的用户使用sudo 运行命令,我遵循了here 的说明(我确实跳过了编辑配置文件部分,因为我不介意在命令之前输入 sudo。)

我还获取了 jamesc 的 grepcut sn-p 并将其封装在子进程调用中。

from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))

【讨论】:

【参考方案12】:

如果您控制了任务的代码,那么您可以通过让任务在第一次执行时触发简单的重试,然后检查inspect().reserved() 来解决此问题。重试将任务注册到结果后端,芹菜可以看到。该任务必须接受selfcontext 作为第一个参数,以便我们可以访问重试计数。

@task(bind=True)
def mytask(self):
    if self.request.retries == 0:
        raise self.retry(exc=MyTrivialError(), countdown=1)
    ...

此解决方案与代理无关,即。您不必担心是使用 RabbitMQ 还是 Redis 来存储任务。

编辑:经过测试,我发现这只是部分解决方案。 reserved 的大小仅限于 worker 的预取设置。

【讨论】:

【参考方案13】:
from celery.task.control import inspect
def key_in_list(k, l):
    return bool([True for i in l if k in i.values()])

def check_task(task_id):
    task_value_dict = inspect().active().values()
    for task_list in task_value_dict:
        if self.key_in_list(task_id, task_list):
             return True
    return False

【讨论】:

对于 Celery > 5,您可以尝试:from your_app.celery import app 然后例如:app.control.inspect().active()【参考方案14】:

subprocess.run

import subprocess
import re
active_process_txt = subprocess.run(['celery', '-A', 'my_proj', 'inspect', 'active'],
                                        stdout=subprocess.PIPE).stdout.decode('utf-8')
return len(re.findall(r'worker_pid', active_process_txt))

注意把my_proj改成your_proj

【讨论】:

这不是问题的答案。这给出了活动任务列表(当前正在运行的任务)。问题是关于如何列出队列中等待的任务。

以上是关于检索芹菜队列中的任务列表的主要内容,如果未能解决你的问题,请参考以下文章

Django 学习之Celery(芹菜)

如何在芹菜中将任务从一个队列移动到另一个队列

芹菜停止执行链

Django Celery delay() 总是推送到默认的“芹菜”队列

芹菜任务设置与视频帧的内存缓存作为python中的循环缓冲区策略

芹菜不处理来自 RabbitMQ 的任务