Storm Nimbus默认的任务调度策略

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm Nimbus默认的任务调度策略相关的知识,希望对你有一定的参考价值。

了解完Nimbus服务的启动细节后,我们将目光移到Nimbus的处理细节上,也就是nimbus.clj代码中的 service-handler [conf inimbus] 方法。

(defserverfn service-handler [conf inimbus]

;;调用 inimbus的prepare方法,

;;(master-inimbus-dir conf) -> $storm.local.dir/nimbus/inimbus

;; 如果不存在$storm.local.dir/nimbus/ 目录,则创建一个这样的目录

(.prepare inimbus conf (master-inimbus-dir conf))

 

(log-message "Starting Nimbus with conf " conf)

;; nimbus 是一个Map,包含了storm配置、INimbus实例、与集群内的zookeeper操作接口,不同topology的心跳缓存(Map),

;; executor分配器 :scheduler

(let [nimbus (nimbus-data conf inimbus)]

;;nimbus.topology.validator    

(.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) conf)

;;清除那些zookeeper上保存的,而在 nimbus的本地目录/nimbus/stormdist没有发现对应的topolgy,清除操作包括

;; 1. 将topology对应的任务信息清除 ./assignments/$storm-id

;;2. 将topology对应的storm信息清除 ./storms/$storm-id

(cleanup-corrupt-topologies! nimbus)

 

;; 得到 zookeeper下 ./storms 所有的znode节点名称,遍历

(doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]

;;这个过程不是很清楚,这个是怎样得到的,完成状态转化?

(transition! nimbus storm-id :startup))

;;在backtype/storm/timer.clj 中定义schedule-recurring

;; 添加一个定时执行的mk-assignments(如果nimbus.reassign为true)、do-cleanup任务

(schedule-recurring (:timer nimbus)

0

(conf NIMBUS-MONITOR-FREQ-SECS)

(fn []

(when (conf NIMBUS-REASSIGN)

(locking (:submit-lock nimbus)

(mk-assignments nimbus)))

;;do-cleanup清除废弃的topology

;;(jar文件,zookeeper上保存有关topology的信息,包括心跳记录,

;; errors记录以及Nimbus进程保存的该topology的心跳

(do-cleanup nimbus)

))

;; Schedule Nimbus inbox cleaner

(schedule-recurring (:timer nimbus)

0

(conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)

(fn []

(clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))

))

;; 返回一个service-handler实现

(reify Nimbus$Iface )

)

在上面的代码中,(schedule-recurring (:timer nimbus) ) 定时执行mk-assignments和do-cleanup 这两个任务是如何实现的呢?

schedule-recurring 方法定义在time.clj中,本篇主要分析的也是这个文件。

在schedule-recurring中调用了schedule方法,首先来看一下schedule方法:

;;timer 是调控队列的一个守护线程实例

;;将afn方法加入到timer的等待队列中

;;队列元素为[执行时间,方法句柄,方法id]

(defnk schedule

[timer delay-secs afn :check-active true]

(when check-active (check-active! timer))

(let [id (uuid)

;; 取出timer控制的队列

^PriorityQueue queue (:queue timer)]

(locking (:lock timer)

;;将[执行时间,方法句柄,方法id] 信息添加到等待队列中

(.add queue [(+ (current-time-millis) (secs-to-millis-long delay-secs)) afn id]))))

 

在来分析一下schedule-recurring 方法:

 

(defn schedule-recurring

[timer delay-secs recur-secs afn]

(schedule timer

delay-secs

(fn this []

;; 执行业务逻辑

(afn)

; This avoids a race condition with cancel-timer.

(schedule timer recur-secs this :check-active false))))

在方法体中调用了 schedule方法,传入schedule 的afn参数是:

afn = (fn this []

;; 执行业务逻辑

(afn)

; This avoids a race condition with cancel-timer.

(schedule timer recur-secs this :check-active false))

 

当执行(afn)时候,首先会执行业务逻辑,然后在调用schedule方法,传入schedule 的afn 的值为:

afn = (fn this []

;; 执行业务逻辑

(afn)

; This avoids a race condition with cancel-timer.

(schedule timer recur-secs this :check-active false))

 

因此这个会以recur-secs的时间间隔不断的去执行用户定义的逻辑。 在nimbus.clj中,就是不断的执行这两个任务:

(mk-assignments nimbus)

(do-cleanup nimbus)

来看一下队列的创建过程:

(let [queue (PriorityQueue. 10 (reify Comparator

(compare

[this o1 o2]

(- (first o1) (first o2)))

(equals

[this obj]

true)))

])

前文的描述可以知道,队列中保持的element是:

[执行时间,方法句柄,方法id]

PriorityQueue ,The head of this queue is the least element with respect to the specified ordering. 也就是队首的元素是队列中最小的,在创建PriorityQueue实例的时候,实现了Comparator 的接口,当o1的开始执行时间要大于o2时,此时compare 返回的值>1,也就是o1>o2。 因此在等待队列中,对首的方法开始执行时间永远是最先开始的,那么方法的调度策略也就成为了一个谁急谁先调用。

在time.clj中开启了一个守护线程不断的从这个队列中比较当前时间i与对首元素的开始执行时间j,若j>i,则将这个线程sleep j-i,否则,则取出对首元素的方法,执行调用(afn).

更多有关PriorityQueue(优先级队列)有关的实现细节待续。

以上是关于Storm Nimbus默认的任务调度策略的主要内容,如果未能解决你的问题,请参考以下文章

JStorm与Storm源码分析--Scheduler,调度器

Storm 简单介绍

spark 调度优化

Storm篇--Storm 容错机制

spark-调度策略之FAIR

YARN资源调度策略之Capacity Scheduler