Spark Streaming源码解读之JobScheduler详解

Posted snail_gesture

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming源码解读之JobScheduler详解相关的知识,希望对你有一定的参考价值。

一:JobSheduler的源码解析
1. JobScheduler是Spark Streaming整个调度的核心,相当于Spark Core上的DAGScheduler.
2. Spark Streaming为啥要设置两条线程?
setMaster指定的两条线程是指程序运行的时候至少需要两条线程。一条线程用于接收数据,需要不断的循环。而我们指定的线程数是用于作业处理的。
3. JobSheduler的启动是在StreamContext的start方法被调用的时候启动的。

def start(): Unit = synchronized 
  state match 
    case INITIALIZED =>
      startSite.set(DStream.getCreationSite())
      StreamingContext.ACTIVATION_LOCK.synchronized 
        StreamingContext.assertNoOtherContextIsActive()
        try 
          validate()
//而这里面启动的新线程是调度方面的,因此和我们设置的线程数没有关系。
          // Start the streaming scheduler in a new thread, so that thread local properties
          // like call sites and job groups can be reset without affecting those of the
          // current thread.
          ThreadUtils.runInNewThread("streaming-start") 
            sparkContext.setCallSite(startSite.get)
            sparkContext.clearJobGroup()
            sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
            scheduler.start()
          
4.  jobScheduler会负责逻辑层面的Job,并将其物理级别的运行在Spark之上.
/**
 * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
 * the jobs and runs them using a thread pool.
 */
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging 
5.  jobScheduler的start方法源码如下:
def start(): Unit = synchronized 
  if (eventLoop != null) return // scheduler has already been started

  logDebug("Starting JobScheduler")
  eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") 
    override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
  
  eventLoop.start()

  // attach rate controllers of input streams to receive batch completion updates
  for 
    inputDStream <- ssc.graph.getInputStreams
    rateController <- inputDStream.rateController
   ssc.addStreamingListener(rateController)

  listenerBus.start(ssc.sparkContext)
  receiverTracker = new ReceiverTracker(ssc)
  inputInfoTracker = new InputInfoTracker(ssc)
  receiverTracker.start()
  jobGenerator.start()
  logInfo("Started JobScheduler")

6.  其中processEvent的源码如下:
private def processEvent(event: JobSchedulerEvent) 
  try 
    event match 
      case JobStarted(job, startTime) => handleJobStart(job, startTime)
      case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
      case ErrorReported(m, e) => handleError(m, e)
    
   catch 
    case e: Throwable =>
      reportError("Error in job scheduler", e)
  

7.  handleJobStart的源码如下:
private def handleJobStart(job: Job, startTime: Long) 
  val jobSet = jobSets.get(job.time)
  val isFirstJobOfJobSet = !jobSet.hasStarted
  jobSet.handleJobStart(job)
  if (isFirstJobOfJobSet) 
    // "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the
    // correct "jobSet.processingStartTime".
    listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
  
  job.setStartTime(startTime)
  listenerBus.post(StreamingListenerOutputOperationStarted(job.toOutputOperationInfo))
  logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)

8.  JobScheduler初始化的时候干了那些事?

此时为啥要设置并行度呢?
1) 如果Batch Duractions中有多个Output操作的话,提高并行度可以极大的提高性能。
2) 不同的Batch,线程池中有很多的线程,也可以并发运行。
将逻辑级别的Job转化为物理级别的job就是通过newDaemonFixedThreadPool线程实现的。

// Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff
// https://gist.github.com/AlainODea/1375759b8720a3f9f094
private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
//可以手动设置并行度
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
// numConcurrentJobs 默认是1
private val jobExecutor =
  ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
//初始化JoGenerator
private val jobGenerator = new JobGenerator(this)
val clock = jobGenerator.clock
//
val listenerBus = new StreamingListenerBus()

// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
var receiverTracker: ReceiverTracker = null

print的函数源码如下:
1. DStream中的print源码如下:

/**
 * Print the first ten elements of each RDD generated in this DStream. This is an output
 * operator, so this DStream will be registered as an output stream and there materialized.
 */
def print(): Unit = ssc.withScope 
  print(10)

2.  实际调用的时候还是对RDD进行操作。
/**
 * Print the first num elements of each RDD generated in this DStream. This is an output
 * operator, so this DStream will be registered as an output stream and there materialized.
 */
def print(num: Int): Unit = ssc.withScope 
  def foreachFunc: (RDD[T], Time) => Unit = 
    (rdd: RDD[T], time: Time) => 
      val firstNum = rdd.take(num + 1)
      // scalastyle:off println
      println("-------------------------------------------")
      println("Time: " + time)
      println("-------------------------------------------")
      firstNum.take(num).foreach(println)
      if (firstNum.length > num) println("...")
      println()
      // scalastyle:on println
    
  
  foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)

3.  foreachFunc封装了RDD的操作。
/**
 * Apply a function to each RDD in this DStream. This is an output operator, so
 * 'this' DStream will be registered as an output stream and therefore materialized.
 * @param foreachFunc foreachRDD function
 * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
 *                           in the `foreachFunc` to be displayed in the UI. If `false`, then
 *                           only the scopes and callsites of `foreachRDD` will override those
 *                           of the RDDs on the display.
 */
private def foreachRDD(
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean): Unit = 
  new ForEachDStream(this,
    context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()

4.  每个BatchDuractions都会根据generateJob生成作业。
/**
 * An internal DStream used to represent output operations like DStream.foreachRDD.
 * @param parent        Parent DStream
 * @param foreachFunc   Function to apply on each RDD generated by the parent DStream
 * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
 *                           by `foreachFunc` will be displayed in the UI; only the scope and
 *                           callsite of `DStream.foreachRDD` will be displayed.
 */
private[streaming]
class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStream[Unit](parent.ssc) 

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[Unit]] = None
//每个Batch Duractions都根据generateJob生成Job
  override def generateJob(time: Time): Option[Job] = 
    parent.getOrCompute(time) match 
      case Some(rdd) =>

        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) 
//foreachFunc基于rdd和time封装为func了,此时的foreachFunc就被job.run
//的时候调用了。
//此时的RDD就是基于时间生成的RDD,这个RDD就是DStreamGraph中的最后一个DStream决定的。然后
          foreachFunc(rdd, time)
        
        Some(new Job(time, jobFunc))
      case None => None
    
  

5.  此时的foreachFunc是从哪里来的?
private[streaming]
//参数传递过来的,这个时候就要去找forEachDStream在哪里被调用。 
class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStream[Unit](parent.ssc) 
6.   由此可以知道真正Job的生成是通过ForeachDStream通generateJob来生成的,此时是逻辑级别的,但是真正被物理级别的调用是在JobGenerator中generateJobs被调用的。
def generateJobs(time: Time): Seq[Job] = 
  logDebug("Generating jobs for time " + time)
  val jobs = this.synchronized 
//此时的outputStream就是forEachDStream
    outputStreams.flatMap  outputStream =>
      val jobOption = outputStream.generateJob(time)
      jobOption.foreach(_.setCallSite(outputStream.creationSite))
      jobOption
    
  
  logDebug("Generated " + jobs.length + " jobs for time " + time)
  jobs

6.  由此可以知道真正Job的生成是通过ForeachDStream通过generateJob来生成的,此时是逻辑级别的,但是真正被物理级别的调用是在JobGenerator中generateJobs被调用的。
def generateJobs(time: Time): Seq[Job] = 
  logDebug("Generating jobs for time " + time)
  val jobs = this.synchronized 
//此时的outputStream就是forEachDStream
    outputStreams.flatMap  outputStream =>
      val jobOption = outputStream.generateJob(time)
      jobOption.foreach(_.setCallSite(outputStream.creationSite))
      jobOption
    
  
  logDebug("Generated " + jobs.length + " jobs for time " + time)
  jobs

以上是关于Spark Streaming源码解读之JobScheduler详解的主要内容,如果未能解决你的问题,请参考以下文章

Spark发行版笔记13:Spark Streaming源码解读之Driver容错安全性

(版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

Spark Streaming源码解读之Executor容错安全性

Spark 定制版:008~Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考

Spark Streaming源码解读之生成全生命周期彻底研究与思考

Spark版本定制八:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考