Django Celery 获取任务计数

Posted

技术标签:

【中文标题】Django Celery 获取任务计数【英文标题】:Django Celery get task count 【发布时间】:2013-09-09 00:18:40 【问题描述】:

我目前正在使用 django 和 celery,一切正常。

但是,如果服务器过载,我希望能够通过检查当前计划了多少任务,让用户有机会取消任务。

我怎样才能做到这一点?

我使用 redis 作为代理。

我刚刚发现了这个: Retrieve list of tasks in a queue in Celery

这在某种程度上与我的问题有关,但我不需要列出任务,只需计算它们 :)

【问题讨论】:

【参考方案1】:

这是使用与代理无关的 celery 获取队列中消息数量的方法。

通过使用connection_or_acquire,您可以利用 celery 的内部连接池来最大限度地减少与您的代理的打开连接数。

celery = Celery(app)

with celery.connection_or_acquire() as conn:
    conn.default_channel.queue_declare(
        queue='my-queue', passive=True).message_count

您还可以扩展 Celery 以提供此功能:

from celery import Celery as _Celery


class Celery(_Celery)

    def get_message_count(self, queue):
        '''
        Raises: amqp.exceptions.NotFound: if queue does not exist
        '''
        with self.connection_or_acquire() as conn:
            return conn.default_channel.queue_declare(
                queue=queue, passive=True).message_count


celery = Celery(app)
num_messages = celery.get_message_count('my-queue')

【讨论】:

请提供某种解释来支持您的回答。 @Lal 添加了对该方法的一些解释 - 希望对您有所帮助! amqp.exceptions.NotFound: Queue.declare: (404) NOT_FOUND - vhost '/' 中没有队列'default' 因为我的队列不在'/' 主机上它在'/apples' 主机上.我如何到达该主机? 将被动设置为 'False' 也可以工作并规避 404 NOT FOUND 问题。 @Mario 最终会创建不存在的 Exchange,但这很可能是不希望的【参考方案2】:

如果你的broker配置为redis://localhost:6379/1,你的任务提交到通用celery队列,那么你可以通过以下方式获取长度:

import redis
queue_name = "celery"
client = redis.Redis(host="localhost", port=6379, db=1)
length = client.llen(queue_name)

或者,从 shell 脚本(适用于监视器等):

$ redis-cli -n 1 -h localhost -p 6379 llen celery

【讨论】:

尽管这对于 redis 代理来说是一个正确的解决方案,但请将 @stephen Fuhry 的评论标记为正确的解决方案,因为它与代理无关。【参考方案3】:

如果你已经在你的应用中配置了redis,你可以试试这个:

from celery import Celery

QUEUE_NAME = 'celery'

celery = Celery(app)
client = celery.connection().channel().client

length = client.llen(QUEUE_NAME)

【讨论】:

对于redis,client = app.broker_connection().channel().client 这将在您每次运行此代码时创建一个新的挂起 Redis 连接。您必须释放打开的连接和通道。【参考方案4】:

获取一个Celery使用的redis客户端实例,然后检查队列长度。每次使用时不要忘记释放连接(使用.acquire):

# Get a configured instance of celery:
from project.celery import app as celery_app

def get_celery_queue_len(queue_name):
    with celery_app.pool.acquire(block=True) as conn:
        return conn.default_channel.client.llen(queue_name)

始终从池中获取连接,不要手动创建它。否则,您的 redis 服务器将用完连接槽,这将杀死您的其他客户端。

【讨论】:

【参考方案5】:

我将围绕未找到错误扩展 @StephenFuhry 的答案,因为即使 Celery 建议 mess with brokers directly,检索队列长度的或多或少与代理无关的方式也是有益的。在 Celery 4(使用 Redis 代理)中,此错误如下所示:

ChannelError: Channel.queue_declare: (404) NOT_FOUND - no queue 'NAME' in vhost '/'

观察:

    ChannelError 是一个 kombu 异常(事实上,它是 amqpkombu “重新导出”它)。

    在 Redis 代理 Celery/Kombu 上将队列表示为 Redis 列表

    Redis 集合类型键为removed whenever the collection becomes empty

    如果我们看看queue_declare 做了什么,它有these lines:

    if passive and not self._has_queue(queue, **kwargs):
        raise ChannelError(...)
    

    Kombu Redis 虚拟传输的_has_queue 是this:

    def _has_queue(self, queue, **kwargs):
        with self.conn_or_acquire() as client:
            with client.pipeline() as pipe:
                for pri in self.priority_steps:
                    pipe = pipe.exists(self._q_for_pri(queue, pri))
                return any(pipe.execute())
    

结论是,在从queue_declare 引发的 Redis 代理 ChannelError 上是可以的(当然对于现有队列),只是意味着队列是空的。

这是一个如何输出所有活动 Celery 队列长度的示例(通常应该为 0,除非您的工作人员无法处理任务)。

from kombu.exceptions import ChannelError

def get_queue_length(name):
    with celery_app.connection_or_acquire() as conn: 
        try:
            ok_nt = conn.default_channel.queue_declare(queue=name, passive=True)
        except ChannelError:
            return 0
        else:
            return ok_nt.message_count
        
for queue_info in celery_app.control.inspect().active_queues().values():
    print(queue_info[0]['name'], get_queue_length(queue_info[0]['name']))

【讨论】:

以上是关于Django Celery 获取任务计数的主要内容,如果未能解决你的问题,请参考以下文章

xadmin引入celery执行异步任务与定时任务

Celery的使用

celery django 守护进程上的多个工作人员和多个队列

Celery框架的基本使用与介绍

Django 项目celery beat报错:Pidfile already exists

弹性豆茎上自动缩放的 django 应用程序的多个 celery beat 实例