Spark版本定制第6天:Job动态生成和深度思考

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark版本定制第6天:Job动态生成和深度思考相关的知识,希望对你有一定的参考价值。

本期内容:

1 Job动态生成

2 深度思考

  一切不能进行实时流处理的数据都是无效的数据。在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下。

  Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序。如果可以掌握Spark streaming这个复杂的应用程序,那么其他的再复杂的应用程序都不在话下了。这里选择Spark Streaming作为版本定制的切入点也是大势所趋。

  

  在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler

private[streaming] 
class JobScheduler(val ssc: StreamingContext) extends Logging 

  在JobScheduler中有两个重要的成员:JobGenerator和ReceiverTracker,而在JobGenerator中有两个至关重要的成员就是RecurringTimer和EventLoop。

  当JobGenerator启动的时候会调用startFirstTime方法,当然如果不是第一次启动就会restart。

  if (ssc.isCheckpointPresent) { 
    restart()      
  } else { 
    startFirstTime() 
  }

  在 这个方法中:会启动DStream和定时器

private def startFirstTime() { 
  val startTime = new Time(timer.getStartTime()) 
  graph.start(startTime - graph.batchDuration) 
  timer.start(startTime.milliseconds) 
  logInfo("Started JobGenerator at " + startTime) 
} 

  定时器负责在每一个时间间隔batchInterval,在EventLoop循环中发送一次消息。在EventLoop接收到消息后就会启动run方法在消息队列总来执行

override def run(): Unit = { 
  try { 
    while (!stopped.get) { 
      val event = eventQueue.take() 
      try { 
        onReceive(event) 
      } catch { 
        case NonFatal(e) => { 
          try { 
            onError(e) 
          } catch { 
            case NonFatal(e) => logError("Unexpected error in " + name, e) 
          } 
        } 
      } 
    } 
  } catch { 
    case ie: InterruptedException => // exit even if eventQueue is not empty 
    case NonFatal(e) => logError("Unexpected error in " + name, e) 
  } 
} 

  此时在消息中会执行generateJobs方法来不断地生成job,至此,job完成了生成的过程。

private def generateJobs(time: Time) { 
  // Set the SparkEnv in this thread, so that job generation code can access the environment 
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager 
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. 
  SparkEnv.set(ssc.env) 
  Try { 
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch 
    graph.generateJobs(time) // generate jobs using allocated block 
  } match { 
    case Success(jobs) => 
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) 
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 
    case Failure(e) => 
      jobScheduler.reportError("Error generating jobs for time " + time, e) 
  } 
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) 
} 

  

在job生成过程中主要包含了以下4个步骤

  1. 要求ReceiverTracker将目前已收到的数据进行一次allocate,即将上次batch切分后的数据切分到到本次新的batch里

  2. 要求DStreamGraph复制出一套新的 RDD DAG 的实例。整个DStreamGraph.generateJobs(time)遍历结束的返回值是Seq[Job]

  3. 将第2步生成的本 batch 的 RDD DAG,和第1步获取到的 meta 信息,一同提交给JobScheduler异步执行这里我们提交的是将 (a) time (b) Seq[job] (c) 块数据的meta信息。这三者包装为一个JobSet,然后调用JobScheduler.submitJobSet(JobSet)提交给JobScheduler。这里的向JobScheduler提交过程与JobScheduler接下来在jobExecutor里执行过程是异步分离的,因此本步将非常快即可返回。

  4. 只要提交结束(不管是否已开始异步执行),就马上对整个系统的当前运行状态做一个checkpoint这里做checkpoint也只是异步提交一个DoCheckpoint消息请求,不用等 checkpoint 真正写完成即可返回这里也简单描述一下 checkpoint 包含的内容,包括已经提交了的、但尚未运行结束的JobSet等实际运行时信息。

 

 

备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

以上是关于Spark版本定制第6天:Job动态生成和深度思考的主要内容,如果未能解决你的问题,请参考以下文章

第6课:Spark Streaming源码解读之Job动态生成和深度思考

Spark版本定制第7天:JobScheduler内幕实现和深度思考

(版本定制)第7课:Spark Streaming源码解读之JobScheduler内幕实现和深度思考

Spark版本定制第8天:RDD生成生命周期彻底

Spark版本定制第10天:流数据生命周期和思考

Spark Streaming源码解读之Job动态生成和深度思考