spark 源码分析之三 -- LiveListenerBus介绍

Posted johnny666888

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 源码分析之三 -- LiveListenerBus介绍相关的知识,希望对你有一定的参考价值。

LiveListenerBus 

  首先,它定义了 4 个 消息堵塞队列,队列的名字分别为shared、appStatus、executorManagement、eventLog。队列的类型是 org.apache.spark.scheduler.AsyncEventQueue#AsyncEventQueue,保存在 queues 变量中。每一个队列上都可以注册监听器,如果队列没有监听器,则会被移除。

  它有启动和stop和start两个标志位来指示 监听总线的的启动停止状态。 如果总线没有启动,有事件过来,先放到 一个待添加的可变数组中,否则直接将事件 post 到每一个队列中。

  其直接依赖类是 AsyncEventQueue, 相当于 LiveListenerBus 的多事件队列是对 AsyncEventQueue 进一步的封装。

AsyncEventQueue

  其继承关系如下:

  技术图片

  它有启动和stop和start两个标志位来指示 监听总线的的启动停止状态。

  其内部维护了listenersPlusTimers 主要就是用来保存注册到这个总线上的监听器对象的。

  post 操作将事件放入内部的 LinkedBlockingQueue中,默认大小是 10000。

  有一个事件分发器,它不停地从 LinkedBlockingQueue 执行 take 操作,获取事件,并将事件进一步分发给所有的监听器,由org.apache.spark.scheduler.SparkListenerBus#doPostEvent 方法实现事件转发,具体代码如下:

 1 protected override def doPostEvent(
 2       listener: SparkListenerInterface,
 3       event: SparkListenerEvent): Unit = 
 4     event match 
 5       case stageSubmitted: SparkListenerStageSubmitted =>
 6         listener.onStageSubmitted(stageSubmitted)
 7       case stageCompleted: SparkListenerStageCompleted =>
 8         listener.onStageCompleted(stageCompleted)
 9       case jobStart: SparkListenerJobStart =>
10         listener.onJobStart(jobStart)
11       case jobEnd: SparkListenerJobEnd =>
12         listener.onJobEnd(jobEnd)
13       case taskStart: SparkListenerTaskStart =>
14         listener.onTaskStart(taskStart)
15       case taskGettingResult: SparkListenerTaskGettingResult =>
16         listener.onTaskGettingResult(taskGettingResult)
17       case taskEnd: SparkListenerTaskEnd =>
18         listener.onTaskEnd(taskEnd)
19       case environmentUpdate: SparkListenerEnvironmentUpdate =>
20         listener.onEnvironmentUpdate(environmentUpdate)
21       case blockManagerAdded: SparkListenerBlockManagerAdded =>
22         listener.onBlockManagerAdded(blockManagerAdded)
23       case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
24         listener.onBlockManagerRemoved(blockManagerRemoved)
25       case unpersistRDD: SparkListenerUnpersistRDD =>
26         listener.onUnpersistRDD(unpersistRDD)
27       case applicationStart: SparkListenerApplicationStart =>
28         listener.onApplicationStart(applicationStart)
29       case applicationEnd: SparkListenerApplicationEnd =>
30         listener.onApplicationEnd(applicationEnd)
31       case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
32         listener.onExecutorMetricsUpdate(metricsUpdate)
33       case executorAdded: SparkListenerExecutorAdded =>
34         listener.onExecutorAdded(executorAdded)
35       case executorRemoved: SparkListenerExecutorRemoved =>
36         listener.onExecutorRemoved(executorRemoved)
37       case executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage =>
38         listener.onExecutorBlacklistedForStage(executorBlacklistedForStage)
39       case nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage =>
40         listener.onNodeBlacklistedForStage(nodeBlacklistedForStage)
41       case executorBlacklisted: SparkListenerExecutorBlacklisted =>
42         listener.onExecutorBlacklisted(executorBlacklisted)
43       case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
44         listener.onExecutorUnblacklisted(executorUnblacklisted)
45       case nodeBlacklisted: SparkListenerNodeBlacklisted =>
46         listener.onNodeBlacklisted(nodeBlacklisted)
47       case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
48         listener.onNodeUnblacklisted(nodeUnblacklisted)
49       case blockUpdated: SparkListenerBlockUpdated =>
50         listener.onBlockUpdated(blockUpdated)
51       case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
52         listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
53       case _ => listener.onOtherEvent(event)
54     
55   

  然后去调用 listener 的相对应的方法。

  就这样,事件总线上的消息事件被监听器消费了。

 

以上是关于spark 源码分析之三 -- LiveListenerBus介绍的主要内容,如果未能解决你的问题,请参考以下文章

Dubbo 源码分析系列之三 —— 架构原理

Tornado 高并发源码分析之三--- Application 对象

MyBatis源码分析之三层结构介绍

MyBatis源码分析之三层结构介绍

Qt基础之三十四:QTcpSocket和QTcpServer源码分析

自定义spring boot starter三部曲之三:源码分析spring.factories加载过程