Spark Streaming源码解读之RDD生成全生命周期详解

Posted snail_gesture


篇首语:本文由小常识网(小编为大家整理,主要介绍了Spark Streaming源码解读之RDD生成全生命周期详解相关的知识,希望对你有一定的参考价值。

1. RDD是谁产生的?
2. 如何产生RDD?

object WordCount 
  def main(args:Array[String]): Unit =
    val sparkConf = new SparkConf().setMaster("Master:7077").setAppName("WordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(1))

    val lines = ssc.socketTextStream("Master",9999)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = => (x,1)).reduceByKey(_+_)

  1. Dstream之间是有依赖关系。比如map操作,产生MappedDStream.
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope 
  new MappedDStream(this, context.sparkContext.clean(mapFunc))

2.  MappedDStream中的compute方法,会先获取parent Dstream.然后基于其结果进行map操作,其中mapFunc就是我们传入的业务逻辑。
class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) 

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[U]] = 

3.  DStream:

a)  每个DStream之间有依赖关系,除了第一个DStream是基于数据源产生,其他DStream均依赖于前面的DStream.
b)  DStream基于时间产生RDD。
* DStreams internally is characterized by a few basic properties:
 *  - A list of other DStreams that the DStream depends on
 *  - A time interval at which the DStream generates an RDD
 *  - A function that is used to generate an RDD after each time interval

abstract class DStream[T: ClassTag] (
    @transient private[streaming] var ssc: StreamingContext
  ) extends Serializable with Logging 


  1. DStream中的generatedRDDs的HashMap中每个Time都会产生一个RDD,而每个RDD都对应着一个Job,因为此时的RDD就是整个DStream操作的时间间隔的最后一个RDD,而最后一个RDD和前面的RDD是有依赖关系。
// RDDs generated, marked as private[streaming] so that testsuites can access it
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()


1. DStream中的getOrCompute会根据时间生成RDD。

 * Get the RDD corresponding to the given time; either retrieve it from cache
 * or compute-and-cache it.
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = 
  // If RDD was already generated, then retrieve it from HashMap,
  // or else compute the RDD
    // Compute the RDD if time is valid (e.g. correct time in a sliding window)
    // of RDD generation, else generate nothing.
    if (isTimeValid(time)) 

      val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) 
        // Disable checks for existing output directories in jobs launched by the streaming
        // scheduler, since we may need to write output to an existing directory during checkpoint
        // recovery; see SPARK-4835 for more details. We need to have this call here because
        // compute() might cause Spark jobs to be launched.
      rddOption.foreach  case newRDD =>
        // Register the generated RDD for caching and checkpointing
        if (storageLevel != StorageLevel.NONE) 
          logDebug(s"Persisting RDD $ for time $time to $storageLevel")
        if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) 
          logInfo(s"Marking RDD $ for time $time for checkpointing")
        generatedRDDs.put(time, newRDD)

2.  在ReceiverInputDStream中compute源码如下:ReceiverInputDStream会生成计算链条中的首个RDD。后面的RDD就会依赖此RDD。
 * Generates RDDs with blocks received by the receiver of this stream. */
override def compute(validTime: Time): Option[RDD[T]] = 
  val blockRDD = 

    if (validTime < graph.startTime) 
      // If this is called for any time before the start time of the context,
      // then this returns an empty RDD. This may happen when recovering from a
      // driver failure without any write ahead log to recover pre-failure data.
      new BlockRDD[T](, Array.empty)
      // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
      // for this batch
// receiverTracker会跟踪数据
      val receiverTracker = ssc.scheduler.receiverTracker
// blockInfos
      val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

      // Register the input blocks information into InputInfoTracker
      val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
      ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
// validTime是
      // Create the BlockRDD
      createBlockRDD(validTime, blockInfos)

3.  createBlockRDD源码如下:
private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = 

  if (blockInfos.nonEmpty) 
    val blockIds =  _.blockId.asInstanceOf[BlockId] .toArray

    // Are WAL record handles present with all the blocks
    val areWALRecordHandlesPresent = blockInfos.forall  _.walRecordHandleOption.nonEmpty 

    if (areWALRecordHandlesPresent) 
      // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
      val isBlockIdValid =  _.isBlockIdValid() .toArray
      val walRecordHandles =  _.walRecordHandleOption.get .toArray
      new WriteAheadLogBackedBlockRDD[T](
        ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
      // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
      // others then that is unexpected and log a warning accordingly.
      if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) 
        if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) 
          logError("Some blocks do not have Write Ahead Log information; " +
            "this is unexpected and data may not be recoverable after driver failures")
          logWarning("Some blocks have Write Ahead Log information; this is unexpected")
      val validBlockIds = blockIds.filter  id =>
      if (validBlockIds.size != blockIds.size) 
        logWarning("Some blocks could not be recovered as they were not found in memory. " +
          "To prevent such data loss, enabled Write Ahead Log (see programming guide " +
          "for more details.")
      new BlockRDD[T](, validBlockIds)
    // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
    // according to the configuration
    if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) 
      new WriteAheadLogBackedBlockRDD[T](
        ssc.sparkContext, Array.empty, Array.empty, Array.empty)
      new BlockRDD[T](, Array.empty)

4.  map算子操作,产生MappedDStream。
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope 
  new MappedDStream(this, context.sparkContext.clean(mapFunc))

5.  MappedDStream源码如下:除了第一个DStream产生RDD之外,其他的DStream都是从前面DStream产生的RDD开始计算,然后返回RDD,因此,对DStream的transformations操作就是对RDD进行transformations操作。
class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) 

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration
  override def compute(validTime: Time): Option[RDD[U]] = 
// getOrCompute是对RDD进行操作,后面的map就是对RDD进行操作

6.  forEachDStream的源码如下:
 * An internal DStream used to represent output operations like DStream.foreachRDD.
 * @param parent        Parent DStream
 * @param foreachFunc   Function to apply on each RDD generated by the parent DStream
 * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
 *                           by `foreachFunc` will be displayed in the UI; only the scope and
 *                           callsite of `DStream.foreachRDD` will be displayed.
class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStream[Unit](parent.ssc) 

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override def generateJob(time: Time): Option[Job] = 
    parent.getOrCompute(time) match 
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) 
          foreachFunc(rdd, time)
        Some(new Job(time, jobFunc))
      case None => None

7.  在上述案例中print函数源码如下,foreachFunc函数中直接对RDD进行操作。
 * Print the first num elements of each RDD generated in this DStream. This is an output
 * operator, so this DStream will be registered as an output stream and there materialized.
def print(num: Int): Unit = ssc.withScope 
  def foreachFunc: (RDD[T], Time) => Unit = 
    (rdd: RDD[T], time: Time) => 
      val firstNum = rdd.take(num + 1)
      // scalastyle:off println
      println("Time: " + time)
      if (firstNum.length > num) println("...")
      // scalastyle:on println
  foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)


  1. 在JobGenerator中generateJobs源码如下:
/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) 
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    graph.generateJobs(time) // generate jobs using allocated block
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e), clearCheckpointDataLater = false))

2.  在DStreamGraph中我们前面分析的RDD的产生的动作正在被触发了。
def generateJobs(time: Time): Seq[Job] = 
  logDebug("Generating jobs for time " + time)
  val jobs = this.synchronized 
    outputStreams.flatMap  outputStream =>
      val jobOption = outputStream.generateJob(time)
  logDebug("Generated " + jobs.length + " jobs for time " + time)


以上是关于Spark Streaming源码解读之RDD生成全生命周期详解的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming源码解读之生成全生命周期彻底研究与思考

Spark Streaming源码解读之RDD生成全生命周期详解

Spark Streaming源码解读之Job动态生成和深度思考

Spark 定制版:009~Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

Spark Streaming源码解读之Receiver生成全生命周期彻底研究和思考

Spark Streaming源码解读之数据清理内幕彻底解密