Django Celery 获取任务计数
Posted
技术标签:
【中文标题】Django Celery 获取任务计数【英文标题】:Django Celery get task count 【发布时间】:2013-09-09 00:18:40 【问题描述】:我目前正在使用 django 和 celery,一切正常。
但是,如果服务器过载,我希望能够通过检查当前计划了多少任务,让用户有机会取消任务。
我怎样才能做到这一点?
我使用 redis 作为代理。
我刚刚发现了这个: Retrieve list of tasks in a queue in Celery
这在某种程度上与我的问题有关,但我不需要列出任务,只需计算它们 :)
【问题讨论】:
【参考方案1】:这是使用与代理无关的 celery 获取队列中消息数量的方法。
通过使用connection_or_acquire
,您可以利用 celery 的内部连接池来最大限度地减少与您的代理的打开连接数。
celery = Celery(app)
with celery.connection_or_acquire() as conn:
conn.default_channel.queue_declare(
queue='my-queue', passive=True).message_count
您还可以扩展 Celery 以提供此功能:
from celery import Celery as _Celery
class Celery(_Celery)
def get_message_count(self, queue):
'''
Raises: amqp.exceptions.NotFound: if queue does not exist
'''
with self.connection_or_acquire() as conn:
return conn.default_channel.queue_declare(
queue=queue, passive=True).message_count
celery = Celery(app)
num_messages = celery.get_message_count('my-queue')
【讨论】:
请提供某种解释来支持您的回答。 @Lal 添加了对该方法的一些解释 - 希望对您有所帮助! amqp.exceptions.NotFound: Queue.declare: (404) NOT_FOUND - vhost '/' 中没有队列'default' 因为我的队列不在'/' 主机上它在'/apples' 主机上.我如何到达该主机? 将被动设置为 'False' 也可以工作并规避 404 NOT FOUND 问题。 @Mario 最终会创建不存在的 Exchange,但这很可能是不希望的【参考方案2】:如果你的broker配置为redis://localhost:6379/1
,你的任务提交到通用celery
队列,那么你可以通过以下方式获取长度:
import redis
queue_name = "celery"
client = redis.Redis(host="localhost", port=6379, db=1)
length = client.llen(queue_name)
或者,从 shell 脚本(适用于监视器等):
$ redis-cli -n 1 -h localhost -p 6379 llen celery
【讨论】:
尽管这对于 redis 代理来说是一个正确的解决方案,但请将 @stephen Fuhry 的评论标记为正确的解决方案,因为它与代理无关。【参考方案3】:如果你已经在你的应用中配置了redis,你可以试试这个:
from celery import Celery
QUEUE_NAME = 'celery'
celery = Celery(app)
client = celery.connection().channel().client
length = client.llen(QUEUE_NAME)
【讨论】:
对于redis,client = app.broker_connection().channel().client
这将在您每次运行此代码时创建一个新的挂起 Redis 连接。您必须释放打开的连接和通道。【参考方案4】:
获取一个Celery使用的redis客户端实例,然后检查队列长度。每次使用时不要忘记释放连接(使用.acquire
):
# Get a configured instance of celery:
from project.celery import app as celery_app
def get_celery_queue_len(queue_name):
with celery_app.pool.acquire(block=True) as conn:
return conn.default_channel.client.llen(queue_name)
始终从池中获取连接,不要手动创建它。否则,您的 redis 服务器将用完连接槽,这将杀死您的其他客户端。
【讨论】:
【参考方案5】:我将围绕未找到错误扩展 @StephenFuhry 的答案,因为即使 Celery 建议 mess with brokers directly,检索队列长度的或多或少与代理无关的方式也是有益的。在 Celery 4(使用 Redis 代理)中,此错误如下所示:
ChannelError: Channel.queue_declare: (404) NOT_FOUND - no queue 'NAME' in vhost '/'
观察:
ChannelError
是一个 kombu
异常(事实上,它是 amqp
和 kombu
“重新导出”它)。
在 Redis 代理 Celery/Kombu 上将队列表示为 Redis 列表
Redis 集合类型键为removed whenever the collection becomes empty
如果我们看看queue_declare
做了什么,它有these lines:
if passive and not self._has_queue(queue, **kwargs):
raise ChannelError(...)
Kombu Redis 虚拟传输的_has_queue
是this:
def _has_queue(self, queue, **kwargs):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
pipe = pipe.exists(self._q_for_pri(queue, pri))
return any(pipe.execute())
结论是,在从queue_declare
引发的 Redis 代理 ChannelError
上是可以的(当然对于现有队列),只是意味着队列是空的。
这是一个如何输出所有活动 Celery 队列长度的示例(通常应该为 0,除非您的工作人员无法处理任务)。
from kombu.exceptions import ChannelError
def get_queue_length(name):
with celery_app.connection_or_acquire() as conn:
try:
ok_nt = conn.default_channel.queue_declare(queue=name, passive=True)
except ChannelError:
return 0
else:
return ok_nt.message_count
for queue_info in celery_app.control.inspect().active_queues().values():
print(queue_info[0]['name'], get_queue_length(queue_info[0]['name']))
【讨论】:
以上是关于Django Celery 获取任务计数的主要内容,如果未能解决你的问题,请参考以下文章
celery django 守护进程上的多个工作人员和多个队列