Celery:当排队太多时阻止添加更多任务
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Celery:当排队太多时阻止添加更多任务相关的知识,希望对你有一定的参考价值。
我有一个Flask
REST API,它利用Celery
来运行异步请求。
想法是async=1
查询参数指示应该异步处理请求(立即返回客户端将使用的任务ID)。
与此同时,我希望在等待处理太多时阻止接受新任务。
下面的代码可行,但accepting_new_tasks()
需要约2秒,这太慢了。
Celery中是否有配置(或其他东西)允许限制等待任务的数量;或者更快的方式来获得等待任务的数量?
import math
from celery import Celery
from flask import abort, Flask, jsonify, request
flask_app = Flask(__name__)
celery_app = Celery("tasks", broker="rabbit...")
@flask_app.route("/")
def home():
async_ = request.args.get("async")
settings = request.args.get("settings")
if async_:
if not accepting_new_tasks(celery_app):
return abort(503)
task = celery_app.send_task(name="my-task", kwargs={"settings": settings})
return jsonify({"taskId": task.id})
return jsonify({})
def accepting_new_tasks(celery_app):
inspector = celery_app.control.inspect()
nodes_stats = inspector.stats()
nodes_reserved = inspector.reserved()
workers = 0
for stats in nodes_stats.values():
workers += stats["pool"]["max-concurrency"]
waiting_tasks = 0
for reserved in nodes_reserved.values():
waiting_tasks += len(reserved)
return waiting_tasks < math.ceil(workers / 3)
答案
最终,我通过查询RabbitMQ管理API解决了这个问题,正如https://stackoverflow.com/a/27074594/4183498指出的那样。
import math
from celery import Celery
from flask import abort, Flask, jsonify, request
from requests import get
from requests.auth import HTTPBasicAuth
flask_app = Flask(__name__)
celery_app = Celery("tasks", broker="rabbit...")
def get_workers_count():
inspector = celery_app.control.inspect()
nodes_stats = inspector.stats()
nodes_reserved = inspector.reserved()
workers = 0
for stats in nodes_stats.values():
workers += stats["pool"]["max-concurrency"]
return workers
WORKERS_COUNT = get_workers_count()
@flask_app.route("/")
def home():
async_ = request.args.get("async")
settings = request.args.get("settings")
if async_:
if not accepting_new_tasks(celery_app):
return abort(503)
task = celery_app.send_task(name="my-task", kwargs={"settings": settings})
return jsonify({"taskId": task.id})
return jsonify({})
def accepting_new_tasks(celery_app):WORKERS_COUNT
auth = HTTPBasicAuth("guest", "guest")
response = get(
"http://localhost:15672/api/queues/my_vhost/celery",
auth=auth
)
waiting_tasks = response.json()["messages"]
return waiting_tasks < math.ceil(WORKERS_COUNT / 3)
以上是关于Celery:当排队太多时阻止添加更多任务的主要内容,如果未能解决你的问题,请参考以下文章
如何判断任务是不是已经在 django-celery 中排队?
Celery add_periodic_task 阻止 Django 在 uwsgi 环境中运行