用户启动的后台进程数量有限
Posted
技术标签:
【中文标题】用户启动的后台进程数量有限【英文标题】:limited number of user-initiated background processes 【发布时间】:2017-01-17 21:39:45 【问题描述】:我需要允许用户为非常非常大的工作提交请求。我们说的是 100 GB 的内存和 20 小时的计算时间。这花费了我们公司很多钱,所以规定任何时候只能运行 2 个作业,并且当 2 个已经在运行时请求新作业将被拒绝(并且用户通知服务器忙)。
我当前的解决方案使用来自 concurrent.futures 的 Executor,并且需要将 Apache 服务器设置为仅运行一个进程,从而降低响应速度(当前用户数非常少,所以暂时还可以)。
如果可能的话,我想为此使用 Celery,但我在文档中没有看到任何方法可以完成此特定设置。
如何在 Django 应用程序的后台运行数量有限的作业,并在作业因服务器繁忙而被拒绝时通知用户?
【问题讨论】:
【参考方案1】:对于这种特殊情况,我有两种解决方案,一种是 celery 开箱即用的解决方案,另一种是您自己实现的。
-
你可以对芹菜工人做这样的事情。特别是,您只创建了两个并发=1的工作进程(或者,一个并发=2的工作进程,但这将是线程,而不是不同的进程),这样,只能完成两项工作异步。现在您需要一种在两个作业都被占用时引发异常的方法,然后使用inspect 来计算活动任务的数量并在需要时抛出异常。如需实施,您可以查看this SO post。
您可能还对rate limits 感兴趣。
您可以使用选择的锁定解决方案自己完成所有操作。特别是,一个很好的实现可以确保只有两个进程使用 redis(和 redis-py)运行,如下所示。 (考虑到你知道redis,因为你知道celery)
from redis import StrictRedis
redis = StrictRedis('localhost', '6379')
locks = ['compute:lock1', 'compute:lock2']
for key in locks:
lock = redis.lock(key, blocking_timeout=5)
acquired = lock.acquire()
if acquired:
do_huge_computation()
lock.release()
break
print("Gonna try next possible slot")
if not acquired:
raise SystemLimitsReached("Already at max capacity !")
这样可以确保系统中只能存在两个正在运行的进程。第三个进程将在lock.acquire()
行中阻塞 blocking_timeout 秒,如果锁定成功,acquired
将为 True,否则为 False,您会告诉用户等待!
过去某个时候我有同样的要求,我最终编写的代码类似于上面的解决方案。特别是
-
这具有尽可能少的竞争条件
易于阅读
不依赖于系统管理员,突然将负载下工作人员的并发性加倍并炸毁整个系统。
您还可以实现每个用户的限制,这意味着每个用户可以同时运行 2 个作业,只需更改 compute:lock1 到 compute:userId:lock1 和 lock2 相应地。你不能用香草芹菜来做这个。
【讨论】:
两种解决方案!谢谢。【参考方案2】:第一
您需要SpiXel's solution 的第一部分。据他说,“你只创建了两个并发=1的工作进程”。
第二
为队列中等待的任务设置time out,根据how to limit number of tasks in queue and stop feeding when full?设置CELERY_EVENT_QUEUE_TTL和队列长度限制。
因此,当这两个工作正在运行作业,并且队列中的任务等待 10 秒或您喜欢的任何时间段时,该任务将超时。或者如果队列已经完成,新到达的任务将被丢弃。
第三
你需要额外的东西来处理通知“当作业因为服务器繁忙而被拒绝时通知用户”。
Dead Letter Exchanges 是您所需要的。每次任务因队列长度限制或消息超时而失败。 “一旦达到限制,消息将从队列的前面被丢弃或死信,以便为新消息腾出空间。”
您可以设置“x-dead-letter-exchange”路由到另一个队列,一旦这个队列收到死信消息,您可以向用户发送通知消息。
【讨论】:
值得注意的是,这将有助于将该队列中同时运行的任务数限制为 2。而不是 OP 的问题暗示的每个用户的任务数可能很重要。我对此没有解决方案,但觉得值得注意。 @KeithBailey,你是对的。我认为 SpiXel 的解决方案是正确的。为队列设置超时时间。【参考方案3】:首先,您需要限制工作线程的并发性 (docs):
celery -A proj worker --loglevel=INFO --concurrency=2 -n <worker_name>
这将有助于确保您的活动任务不超过 2 个,即使代码中出现错误也是如此。
现在您有 2 种方法来实现任务编号验证:
您可以使用inspect 获取活动和计划任务的数量:
from celery import current_app
def start_job():
inspect = current_app.control.inspect()
active_tasks = inspect.active() or
scheduled_tasks = inspect.scheduled() or
worker_key = 'celery@%s' % <worker_name>
worker_tasks = active_tasks.get(worker_key, []) + scheduled_tasks.get(worker_key, [])
if len(worker_tasks) >= 2:
raise MyCustomException('It is impossible to start more than 2 tasks.')
else:
my_task.delay()
您可以将当前正在执行的任务数存储在数据库中,并据此验证任务执行情况。
如果您想扩展您的功能,第二种方法可能会更好 - 引入高级用户或不允许一个用户执行 2 个请求。
【讨论】:
感谢您抽出宝贵时间回答这个问题。不过,我认为if len(worker_tasks) >= 2
行存在竞争条件。如果两个进程/线程在 worker_tasks 仅为 1 时同时到达该行,那么它们都可以执行 else
子句,不是吗?有没有办法在添加工作时专门锁定芹菜?
我不知道如何直接在 Celery 中进行。只能建议采用#2 方式并处理数据库中的并发性。以上是关于用户启动的后台进程数量有限的主要内容,如果未能解决你的问题,请参考以下文章