Spark监听器源码阅读
Posted vortual
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark监听器源码阅读相关的知识,希望对你有一定的参考价值。
一、注册监听器并消费事件
1、创建 SparkEnv
org.apache.spark.SparkContext#createSparkEnv
2、启动用户设置的监听器
org.apache.spark.SparkContext#setupAndStartListenerBus
val EXTRA_LISTENERS = ConfigBuilder("spark.extraListeners")
conf.get(EXTRA_LISTENERS).foreach classNames =>
val listeners = Utils.loadExtensions(classOf[SparkListenerInterface], classNames, conf)
listeners.foreach listener =>
listenerBus.addToSharedQueue(listener)
logInfo(s"Registered listener $listener.getClass().getName()")
3、将监听器添加到 shared 队列
org.apache.spark.scheduler.LiveListenerBus#addToSharedQueue
org.apache.spark.scheduler.LiveListenerBus#addToQueue
判断队列是否存在,如果存在就将监听器放到队列中,不存在要新建队列:AsyncEventQueue
queues.asScala.find(_.name == queue) match
case Some(queue) =>
queue.addListener(listener)
case None =>
val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
// listener 放到了 listenersPlusTimers 这个对象中。listenersPlusTimers 是 CopyOnWriteArrayList对象,是一个线程安全的 list
newQueue.addListener(listener)
if (started.get())
newQueue.start(sparkContext)
// LiveListenerBus 会维护很多个队列,放到 queues 对象中,每个队列里面又有很多个监听器
queues.add(newQueue)
4、新建队列并将事件添加到队列中。同时启动一个不停从队列中取数据处理监听事件的线程
1、val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
创建 AsyncEventQueue. 这个对象里面包含了一个 eventQueue 对象
eventQueue 是一个阻塞队列:LinkedBlockingQueue。到时候监听事件就会往这里面塞
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
2、newQueue.start(sparkContext)
这里主要是启动一个分发线程,它调用 take() 方法一直从队列中取出事件,将事件分发到各个监听器
private val dispatchThread = new Thread(s"spark-listener-group-$name")
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sc)
dispatch()
dispatchThread.start()
3. dispatch 方法:不断从队列取出事件并发发给各个监听者
private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true)
// 从队列里面取出监听事件
var next: SparkListenerEvent = eventQueue.take()
while (next != POISON_PILL)
val ctx = processingTime.time()
try
// Post the event to all registered listeners. 把时间发送给所有注册了的监听者
super.postToAll(next)
finally
ctx.stop()
eventCount.decrementAndGet()
next = eventQueue.take()
eventCount.decrementAndGet()
4. 调用父类的 postToAll 方法
org.apache.spark.util.ListenerBus#postToAll
遍历 listenersPlusTimers 对象,取出监听者。调用 SparkListenerBus 的 doPostEvent(listener, event) 方法
org.apache.spark.scheduler.SparkListenerBus#doPostEvent
根据不同的事件类型调用不同方法
event match
case stageSubmitted: SparkListenerStageSubmitted =>
listener.onStageSubmitted(stageSubmitted)
case stageCompleted: SparkListenerStageCompleted =>
listener.onStageCompleted(stageCompleted)
......
接下来就是执行我们具体实现的方法了 onStageSubmitted、onStageCompleted 等
二、Spark 事件产生并发送的地方
举个例子: DAGScheduler 停止的时候会发送一个 SparkListenerJobEnd 事件
org.apache.spark.scheduler.DAGScheduler#cleanUpAfterSchedulerStop
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
将 SparkListenerJobEnd 事件发送给所有的监听者
org.apache.spark.scheduler.LiveListenerBus#postToQueues
// If the bus was already started when the check above was made, just post directly to the
// queues.
postToQueues(event)
// 遍历 queues 对象,获取所有 queue,把事件发送给所有的 queue
private def postToQueues(event: SparkListenerEvent): Unit =
val it = queues.iterator()
while (it.hasNext())
it.next().post(event)
// it.next().post(event)
def post(event: SparkListenerEvent): Unit =
if (stopped.get())
return
eventCount.incrementAndGet()
// 把事件放到队列中。dispatchThread 线程会从队列取出来做处理
if (eventQueue.offer(event))
return
以上是关于Spark监听器源码阅读的主要内容,如果未能解决你的问题,请参考以下文章
spark 源码分析之三 -- LiveListenerBus介绍
spark.mllib源码阅读-分类算法4-DecisionTree