用户启动的后台进程数量有限

Posted

技术标签:

【中文标题】用户启动的后台进程数量有限【英文标题】:limited number of user-initiated background processes 【发布时间】:2017-01-17 21:39:45 【问题描述】:

我需要允许用户为非常非常大的工作提交请求。我们说的是 100 GB 的内存和 20 小时的计算时间。这花费了我们公司很多钱,所以规定任何时候只能运行 2 个作业,并且当 2 个已经在运行时请求新作业将被拒绝(并且用户通知服务器忙)。

我当前的解决方案使用来自 concurrent.futures 的 Executor,并且需要将 Apache 服务器设置为仅运行一个进程,从而降低响应速度(当前用户数非常少,所以暂时还可以)。

如果可能的话,我想为此使用 Celery,但我在文档中没有看到任何方法可以完成此特定设置。

如何在 Django 应用程序的后台运行数量有限的作业,并在作业因服务器繁忙而被拒绝时通知用户?

【问题讨论】:

【参考方案1】:

对于这种特殊情况,我有两种解决方案,一种是 celery 开箱即用的解决方案,另一种是您自己实现的。

    你可以对芹菜工人做这样的事情。特别是,您只创建了两个并发=1的工作进程(或者,一个并发=2的工作进程,但这将是线程,而不是不同的进程),这样,只能完成两项工作异步。现在您需要一种在两个作业都被占用时引发异常的方法,然后使用inspect 来计算活动任务的数量并在需要时抛出异常。如需实施,您可以查看this SO post。

您可能还对rate limits 感兴趣。

    您可以使用选择的锁定解决方案自己完成所有操作。特别是,一个很好的实现可以确保只有两个进程使用 redis(和 redis-py)运行,如下所示。 (考虑到你知道redis,因为你知道celery)

    from redis import StrictRedis
    
    redis = StrictRedis('localhost', '6379')
    locks = ['compute:lock1', 'compute:lock2']
    for key in locks:
        lock = redis.lock(key, blocking_timeout=5)
        acquired = lock.acquire()
        if acquired:
            do_huge_computation()
            lock.release()
            break
        print("Gonna try next possible slot")
    
    if not acquired:
        raise SystemLimitsReached("Already at max capacity !")
    

这样可以确保系统中只能存在两个正在运行的进程。第三个进程将在lock.acquire() 行中阻塞 blocking_timeout 秒,如果锁定成功,acquired 将为 True,否则为 False,您会告诉用户等待!

过去某个时候我有同样的要求,我最终编写的代码类似于上面的解决方案。特别是

    这具有尽可能少的竞争条件 易于阅读 不依赖于系统管理员,突然将负载下工作人员的并发性加倍并炸毁整个系统。 您还可以实现每个用户的限制,这意味着每个用户可以同时运行 2 个作业,只需更改 compute:lock1 到 compute:userId:lock1 和 lock2 相应地。你不能用香草芹菜来做这个。

【讨论】:

两种解决方案!谢谢。【参考方案2】:

第一

您需要SpiXel's solution 的第一部分。据他说,“你只创建了两个并发=1的工作进程”。

第二

为队列中等待的任务设置time out,根据how to limit number of tasks in queue and stop feeding when full?设置CELERY_EVENT_QUEUE_TTL和队列长度限制

因此,当这两个工作正在运行作业,并且队列中的任务等待 10 秒或您喜欢的任何时间段时,该任务将超时。或者如果队列已经完成,新到达的任务将被丢弃。

第三

你需要额外的东西来处理通知“当作业因为服务器繁忙而被拒绝时通知用户”。

Dead Letter Exchanges 是您所需要的。每次任务因队列长度限制或消息超时而失败。 “一旦达到限制,消息将从队列的前面被丢弃或死信,以便为新消息腾出空间。”

您可以设置“x-dead-letter-exchange”路由到另一个队列,一旦这个队列收到死信消息,您可以向用户发送通知消息。

【讨论】:

值得注意的是,这将有助于将该队列中同时运行的任务数限制为 2。而不是 OP 的问题暗示的每个用户的任务数可能很重要。我对此没有解决方案,但觉得值得注意。 @KeithBailey,你是对的。我认为 SpiXel 的解决方案是正确的。为队列设置超时时间。【参考方案3】:

首先,您需要限制工作线程的并发性 (docs):

celery -A proj worker --loglevel=INFO --concurrency=2 -n <worker_name>

这将有助于确保您的活动任务不超过 2 个,即使代码中出现错误也是如此。

现在您有 2 种方法来实现任务编号验证:

    您可以使用inspect 获取活动和计划任务的数量:

     from celery import current_app
    
     def start_job():
          inspect = current_app.control.inspect()
          active_tasks = inspect.active() or 
          scheduled_tasks = inspect.scheduled() or 
          worker_key = 'celery@%s' % <worker_name>
          worker_tasks = active_tasks.get(worker_key, []) + scheduled_tasks.get(worker_key, [])
          if len(worker_tasks) >= 2:
              raise MyCustomException('It is impossible to start more than 2 tasks.') 
          else:
              my_task.delay()
    

    您可以将当前正在执行的任务数存储在数据库中,并据此验证任务执行情况。

如果您想扩展您的功能,第二种方法可能会更好 - 引入高级用户或不允许一个用户执行 2 个请求。

【讨论】:

感谢您抽出宝贵时间回答这个问题。不过,我认为if len(worker_tasks) &gt;= 2 行存在竞争条件。如果两个进程/线程在 worker_tasks 仅为 1 时同时到达该行,那么它们都可以执行 else 子句,不是吗?有没有办法在添加工作时专门锁定芹菜? 我不知道如何直接在 Celery 中进行。只能建议采用#2 方式并处理数据库中的并发性。

以上是关于用户启动的后台进程数量有限的主要内容,如果未能解决你的问题,请参考以下文章

在一个ORACLEA实例中最多可以启动多少个DBWR后台进程?

ORACLE后台进程

HapiJS 启动更长的后台进程

Linux中从后台启动进程,应在命令结尾处加上啥符号

Oracle后台进程

通过在 iOS 中摇动手机在后台启动进程