Spark监听器源码阅读

Posted vortual

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark监听器源码阅读相关的知识,希望对你有一定的参考价值。

一、注册监听器并消费事件

1、创建 SparkEnv

org.apache.spark.SparkContext#createSparkEnv

2、启动用户设置的监听器

org.apache.spark.SparkContext#setupAndStartListenerBus
val EXTRA_LISTENERS = ConfigBuilder("spark.extraListeners")
conf.get(EXTRA_LISTENERS).foreach  classNames =>
	val listeners = Utils.loadExtensions(classOf[SparkListenerInterface], classNames, conf)
	listeners.foreach  listener =>
		listenerBus.addToSharedQueue(listener)
		logInfo(s"Registered listener $listener.getClass().getName()")
	

3、将监听器添加到 shared 队列

org.apache.spark.scheduler.LiveListenerBus#addToSharedQueue
	org.apache.spark.scheduler.LiveListenerBus#addToQueue
		判断队列是否存在,如果存在就将监听器放到队列中,不存在要新建队列:AsyncEventQueue
		  queues.asScala.find(_.name == queue) match 
			  case Some(queue) =>
				queue.addListener(listener)

			  case None =>
				val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
				// listener 放到了 listenersPlusTimers 这个对象中。listenersPlusTimers 是 CopyOnWriteArrayList对象,是一个线程安全的 list
				newQueue.addListener(listener)
				if (started.get()) 
				  newQueue.start(sparkContext)
				
				// LiveListenerBus 会维护很多个队列,放到 queues 对象中,每个队列里面又有很多个监听器
				queues.add(newQueue)
		  

4、新建队列并将事件添加到队列中。同时启动一个不停从队列中取数据处理监听事件的线程

1、val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
创建 AsyncEventQueue. 这个对象里面包含了一个 eventQueue 对象
eventQueue 是一个阻塞队列:LinkedBlockingQueue。到时候监听事件就会往这里面塞
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))

2、newQueue.start(sparkContext)
这里主要是启动一个分发线程,它调用 take() 方法一直从队列中取出事件,将事件分发到各个监听器
  private val dispatchThread = new Thread(s"spark-listener-group-$name") 
	setDaemon(true)
	override def run(): Unit = Utils.tryOrStopSparkContext(sc) 
	  dispatch()
	
  
  
  dispatchThread.start()
  
3. dispatch 方法:不断从队列取出事件并发发给各个监听者
private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) 
// 从队列里面取出监听事件
var next: SparkListenerEvent = eventQueue.take()
while (next != POISON_PILL) 
  val ctx = processingTime.time()
  try 
	// Post the event to all registered listeners. 把时间发送给所有注册了的监听者
	super.postToAll(next)
   finally 
	ctx.stop()
  
  eventCount.decrementAndGet()
  next = eventQueue.take()

eventCount.decrementAndGet()


4. 调用父类的 postToAll 方法
org.apache.spark.util.ListenerBus#postToAll
遍历 listenersPlusTimers 对象,取出监听者。调用 SparkListenerBus 的 doPostEvent(listener, event) 方法
	org.apache.spark.scheduler.SparkListenerBus#doPostEvent
	根据不同的事件类型调用不同方法
	event match 
	  case stageSubmitted: SparkListenerStageSubmitted =>
		listener.onStageSubmitted(stageSubmitted)
	  case stageCompleted: SparkListenerStageCompleted =>
		listener.onStageCompleted(stageCompleted)
	   ......
	
	接下来就是执行我们具体实现的方法了 onStageSubmitted、onStageCompleted 等

二、Spark 事件产生并发送的地方

举个例子: DAGScheduler 停止的时候会发送一个 SparkListenerJobEnd 事件

org.apache.spark.scheduler.DAGScheduler#cleanUpAfterSchedulerStop
	listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
	将 SparkListenerJobEnd 事件发送给所有的监听者
		org.apache.spark.scheduler.LiveListenerBus#postToQueues
		// If the bus was already started when the check above was made, just post directly to the
		// queues.
		postToQueues(event)
		// 遍历 queues 对象,获取所有 queue,把事件发送给所有的 queue
		  private def postToQueues(event: SparkListenerEvent): Unit = 
			val it = queues.iterator()
			while (it.hasNext()) 
			  it.next().post(event)
			
		  
		  // it.next().post(event)
		  def post(event: SparkListenerEvent): Unit = 
			if (stopped.get()) 
			  return
			

			eventCount.incrementAndGet()
			// 把事件放到队列中。dispatchThread 线程会从队列取出来做处理
			if (eventQueue.offer(event)) 
			  return
			
		  

以上是关于Spark监听器源码阅读的主要内容,如果未能解决你的问题,请参考以下文章

spark 源码分析之三 -- LiveListenerBus介绍

spark源码阅读 启动代码阅读

spark.mllib源码阅读-分类算法4-DecisionTree

spark.mllib源码阅读-分类算法4-DecisionTree

spark源码阅读-脚本篇

记载我的spark源码阅读火花