Simmer in R:基于队列长度和持续时间对服务器容量的变化进行建模

Posted

技术标签:

【中文标题】Simmer in R:基于队列长度和持续时间对服务器容量的变化进行建模【英文标题】:Simmer in R: Modelling changes in server capacity based on queue length and duration 【发布时间】:2019-03-26 18:50:30 【问题描述】:

我正在尝试如下建模系统:

到达根据预定义的时间表生成,并且具有由数据帧提供的已知处理时间。在模拟开始时有一个容量等于 min_daemons 的服务器。到目前为止很简单,但 nxt 部分变得棘手:根据以下算法,此容量可以在整个模拟过程中的间隔 [min_daemons , max_daemons] 上变化:

如果在模拟过程中的任何时间,队列长度达到或超过 incr_count 的值,并且在 incr_delay 内保持在此水平或以上,则向主服务器添加一个额外的容量单位。只要容量不超过 max_daemons,这种情况随时都可能发生。

反之亦然。如果在任何时候,队列长度小于 decr_count,并且在 decr_delay 的时间内保持在此级别或以下,则会删除一个容量单位,可能会降至 min_daemons 级别。

我为所有到达创建了一个轨迹,当满足上述更改服务器容量的条件时分支。这样做的问题是容量的变化总是与到达事件相关联。我真正想要的是一个独立于到达轨迹的进程,它始终监控队列长度并进行适当的容量变化。

我考虑过使用某种虚拟到达过程来完成此操作,例如在模拟的每一秒,但我不确定是否可以防止虚拟到达与真实到达竞争服务器容量。

#instantiate simulation environment
env <- simmer("queues") %>% add_resource("daemon",1) %>% add_global("incr_start",99999) %>% add_global("decr_start",99999)

run <-  trajectory() %>% 
  branch(option = function() if (get_queue_count(env,"daemon") >=  incr_count) 1
                             else if (get_queue_count(env,"daemon") <= decr_count) 2
                             else 3
         ,
         continue = c(T, T, T)
         ,
         trajectory("increment?")
         %>% branch(option = function() if (now(env) - get_global(env,"incr_start") >= incr_delay
                                            & get_capacity(env,"daemon") < max_daemons) 1
                                        else if (get_global(env,"incr_start")==99999) 2
                                        else 3
                    ,
                    continue = c(T, T, T)
                    ,
                    trajectory("increment")
                    %>% log_(function () paste("Queue size is: ",get_queue_count(env,"daemon")))
                    %>% log_(function () 
                      paste("Queue has exceeded count for ",now(env)-get_global(env,"incr_start")," seconds."))
                    %>% set_capacity(resource = "daemon", value = 1, mod="+")
                    ,
                    trajectory("update incr start")
                    %>% set_global("incr_start",now(env))
                    %>% log_("Queue count is now above increment count. Starting increment timer.")
                    ,
                    trajectory("do nothing")
                    %>% log_("Did not meet increment criteria. Doing nothing.")
         )
         ,
         trajectory("decrement?")
         %>% branch(option = function() if (now(env) - get_global(env,"decr_start") >= decr_delay
                                            & get_capacity(env,"daemon") > min_daemons) 1
                                        else if (get_global(env,"decr_start")==99999) 2
                                        else 3
                    ,
                    continue = c(T, T, T)
                    ,
                    trajectory("decrement")
                    %>% log_(function () paste("Queue size is: ",get_queue_count(env,"daemon")))
                    %>% log_(function () 
                      paste("Queue has been less than count for ",now(env)-get_global(env,"incr_start")," seconds."))
                    %>% set_capacity(resource = "daemon", value = -1, mod="+")
                    ,
                    trajectory("update decr start")
                    %>% set_global("decr_start",now(env))
                    %>% log_("Queue count is now below decrement count. Starting decrement timer.")
                    ,
                    trajectory("do nothing")
                    %>% log_("Did not meet decrement criteria. Doing nothing.")
         )
         ,
         trajectory("reset_timer")
         %>% log_("Did not meet criteria to increment or decrement. Resetting timers.")
         %>% set_global("decr_start", values = 99999)
         %>% set_global("decr_start", values = 99999)
  ) %>%
  seize("daemon") %>% 
  log_("Now running") %>%
  log_(function () paste(get_queue_count(env,"daemon")," runs in the queue.")) %>%
  timeout_from_attribute("service") %>% 
  release("daemon") %>% 
  log_("Run complete")

env %>% 
  add_dataframe("run", run, arr,time="absolute") %>% 
  run(200)

我需要进行更多调试以验证模拟是否按预期工作,但我完全理解这个模型是错误的。我希望该设计不会过多损害其有效性,但我也希望获得有关如何设计出更贴近现实生活的反馈。

【问题讨论】:

问题是什么?你的模型哪里出错了?你怎么知道是错的? 第 5 和第 6 段是问题所在。我的代码执行没有错误,但模型不正确,我无法弄清楚如何使用标准的 simmer 工具正确实现它。真正的系统会不断检查队列在一定时间内超过一定长度范围的时间,此时它会调整服务器的容量。我的模型只有在有新品出现时才能进行这种调整。 【参考方案1】:

定期检查状态会破坏离散事件的全部目的。我们这里有一个异步过程,因此对其建模的正确方法是使用信号。

我们需要问自己:什么时候可以排队...

    增加?当到达点击seize() 活动时。因此,在尝试抢占资源之前,我们需要检查入队到达的数量并采取相应措施:

    如果等于decr_count,则必须发送信号以取消任何降低服务器容量的尝试。 如果等于incr_count - 1,则必须发送信号以请求增加服务器容量。 否则什么也不做。

    减少?当一个到达被服务时(即继续到seize()之后的下一个活动。所以在抢占资源之后,我们还需要检查入队到达的数量:

    如果等于incr_count - 1,则必须发送信号以取消任何增加服务器容量的尝试。 如果等于decr_count,则必须发送信号以请求减少服务器容量。 否则什么也不做。

检查入队到达的数量并决定需要哪种信号(如果有)的过程可以捆绑在一个可重用函数中(我们称之为check_queue),如下所示:

library(simmer)

env <- simmer()

check_queue <- function(.trj, resource, mod, lim_queue, lim_server) 
  .trj %>% branch(
    function() 
      if (get_queue_count(env, resource) == lim_queue[1])
        return(1)
      if (get_queue_count(env, resource) == lim_queue[2] &&
          get_capacity(env, resource)    != lim_server)
        return(2)
      0 # pass
    ,
    continue = c(TRUE, TRUE),
    trajectory() %>% send(paste("cancel", mod[1])),
    trajectory() %>% send(mod[2])
  )


main <- trajectory() %>%
  check_queue("resource", c("-", "+"), c(decr_count, incr_count-1), max_daemons) %>%
  seize("resource") %>%
  check_queue("resource", c("+", "-"), c(incr_count-1, decr_count), min_daemons) %>%
  timeout_from_attribute("service") %>%
  release("resource")

这样,主轨迹就相当简单了。然后,我们需要几个进程来接收这些信号,并在一些延迟后增加/减少容量:

change_capacity <- function(resource, mod, delay, limit) 
  trajectory() %>%
    untrap(paste("cancel", mod)) %>%
    trap(mod) %>%
    wait() %>%
    # signal received
    untrap(mod) %>%
    trap(paste("cancel", mod),
         handler = trajectory() %>%
           # cancelled! start from the beginning
           rollback(Inf)) %>%
    timeout(delay) %>%
    set_capacity(resource, as.numeric(paste0(mod, 1)), mod="+") %>%
    # do we need to keep changing the capacity?
    rollback(2, check=function() get_capacity(env, resource) != limit) %>%
    # start from the beginning
    rollback(Inf)


incr_capacity <- change_capacity("resource", "+", incr_delay, max_daemons)
decr_capacity <- change_capacity("resource", "-", decr_delay, min_daemons)

最后,我们将资源、流程和数据添加到模拟环境中:

env %>%
  add_resource("resource", min_daemons) %>%
  add_generator("incr", incr_capacity, at(0)) %>%
  add_generator("decr", decr_capacity, at(0)) %>%
  add_dataframe("arrival", main, data)

注意我没有检查此代码。它可能需要一些调整,但总体思路就在那里。

【讨论】:

非常感谢!我发现你的解释非常有帮助,而且你的代码非常实用。我唯一需要更改的是回滚中的检查功能,以便它再次检查队列是否大于增量计数或小于减量计数。

以上是关于Simmer in R:基于队列长度和持续时间对服务器容量的变化进行建模的主要内容,如果未能解决你的问题,请参考以下文章

基于队列长度的 Azure 自动缩放

.wav 文件长度/持续时间,无需读取文件

go channel的设置长度思想和循环队列

Azure 队列中基于租约和基于锁的独占访问之间的区别

pandas基于时序数据计算模型预测推理需要的统计数据(累计时间长度变化变化率方差均值最大最小等):数据持续的时间(分钟)获得某一节点之后的数据总变化量获得范围内的统计量

基于matlab的傅里叶变换