Spark的RDD检查点实现分析

Posted 泰山不老生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark的RDD检查点实现分析相关的知识,希望对你有一定的参考价值。

概述

在《深入理解Spark:核心思想与源码分析》一书中只是简单介绍了下RDD的checkpoint,对本书是个遗憾。所以此文的目的旨在查漏补缺,完善本书的内容。

Spark的RDD执行完成之后会保存检查点,便于当整个作业运行失败重新运行时候,从检查点恢复之前已经运行成功的RDD结果,这样就会大大减少重新计算的成本,提高任务恢复效率和执行效率,节省Spark各个计算节点的资源。本着重分析检查点的代码实现,更深入理解其原理。在《深入理解Spark:核心思想与源码分析》一书的第5章中讲到在获取RDD分区信息时会调用partitions方法(见代码清单5-11),在获取依赖时会调用dependencies方法(见代码清单5-28)。

代码清单5-11   partitions方法的实现

  final def partitions: Array[Partition] = {
    checkpointRDD.map(_.partitions).getOrElse {
      if (partitions_ == null) {
        partitions_ = getPartitions
      }
      partitions_
    }
  }

代码清单5-28   dependencies方法的实现

  final def dependencies: Seq[Dependency[_]] = {
    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
      if (dependencies_ == null) {
        dependencies_ = getDependencies
      }
      dependencies_
    }
  }
在代码清单5-11所示的RDD的partitions方法和代码清单5-28所示的RDD的dependencies方法中都使用了checkpointRDD,checkpointRDD的定义如下。
  private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)
从checkpointRDD的定义看到,checkpointRDD的信息来自于checkpointData,checkpointData的定义如下:
  private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
所以checkpointRDD实际是通过调用RDDCheckpointData的checkpointRDD方法得到的,checkpointRDD的实现见代码清单5-63。

代码清单5-63         获取检查点数据

  def checkpointRDD: Option[RDD[T]] = {
    RDDCheckpointData.synchronized {
      cpRDD
    }
  }

cpRDD在保存了检查点之后将持有CheckpointRDD,其类型定义如下:

  var cpState = Initialized
  @transient var cpFile: Option[String] = None
  var cpRDD: Option[RDD[T]] = None
上面的代码除了展示cpRDD的类型外,还定义了cpFile和cpState。cpFile用于保存检查点写入HDFS的文件目录,cpState用于表示当前RDD设置检查点的状态,包括初始化(Initialized)、标记将要保存检查点(MarkedForCheckpoint)、保存检查点中(CheckpointingInProgress)和设置检查点完成(Checkpointed)几个状态。RDDCheckpointData的cpState字段默认是Initialized状态。

检查点的启用

用户提交的Spark作业必须主动调用RDD的checkpoint方法(见代码清单5-64),才会启动检查点功能。

代码清单5-64         启用检查点功能
  def checkpoint() {
    if (context.checkpointDir.isEmpty) {
      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
    } else if (checkpointData.isEmpty) {
      checkpointData = Some(new RDDCheckpointData(this))
      checkpointData.get.markForCheckpoint()
    }
  }

只有调用了checkpoint方法,RDD才会创建RDDCheckpointData对象,并由checkpointData持有。同时会调用RDDCheckpointData的markForCheckpoint方法(见代码清单5-65)将cpState状态置为MarkedForCheckpoint。

代码清单5-65         标记启用检查点
  def markForCheckpoint() {
    RDDCheckpointData.synchronized {
      if (cpState == Initialized) cpState = MarkedForCheckpoint
    }
  }

markForCheckpoint方法中将cpState置为MarkedForCheckpoint,有着重要意义:一方面表示启用检查点,另一方面只有当cpState等于MarkedForCheckpoint时,才能够保存检查点。

检查点的保存

cpRDD用于持有CheckpointRDD,但是它是什么时候持有的呢?下面我们将分析检查点的保存时机。在介绍代码清单5-21时,简单的提到了调用RDD的doCheckpoint方法保存检查点,现在来具体分析其代码实现,见代码清单5-66。

代码清单5-66         保存检查点

  def doCheckpoint() {
    RDDCheckpointData.synchronized {
      if (cpState == MarkedForCheckpoint) {
        cpState = CheckpointingInProgress
      } else {
        return
      }
    }

    // Create the output path for the checkpoint
    val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
    val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
    if (!fs.mkdirs(path)) {
      throw new SparkException("Failed to create checkpoint path " + path)
    }

    // Save to file, and reload it as an RDD
    val broadcastedConf = rdd.context.broadcast(
      new SerializableWritable(rdd.context.hadoopConfiguration))
    rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)
    val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
    if (newRDD.partitions.size != rdd.partitions.size) {
      throw new SparkException(
        "Checkpoint RDD " + newRDD + "(" + newRDD.partitions.size + ") has different " +
          "number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
    }

    // Change the dependencies and partitions of the RDD
    RDDCheckpointData.synchronized {
      cpFile = Some(path.toString)
      cpRDD = Some(newRDD)
      rdd.markCheckpointed(newRDD)   // Update the RDD‘s dependencies and partitions
      cpState = Checkpointed
    }
    logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id)
  }

通过阅读doCheckpoint方法,其执行步骤总结如下:

1)        校验用户作业是否启用了检查点,即是否调用了checkpoint方法将cpState置为MarkedForCheckpoint。如果没有启用检查点,则直接返回,不继续进行检查点的保存。

2)        在HDFS上创建用于保存检查点数据的文件路径。其中checkpointDir必须由用户作业调用SparkContext的setCheckpointDir方法(见代码清单5-67)设置。

3)        运行作业,此作业实际执行了CheckpointRDD的writeToFile方法(见代码清单5-68),将检查点数据保存的HDFS上。

4)        将构造的CheckpointRDD由cpRDD持有,检查点保存目录由cpFile持有,最后将cpState设置为Checkpointed。由于保存了检查点,说明此RDD已经成功执行,其依赖和分区相关的信息将不再使用,即便是Job恢复也只需要从检查点读取数据,所以调用RDD的markCheckpointed方法(见代码清单5-69)清除依赖与分区信息。

代码清单5-67         设置作业检查点在HDFS上的保存路径

  def setCheckpointDir(directory: String) {
    checkpointDir = Option(directory).map { dir =>
      val path = new Path(dir, UUID.randomUUID().toString)
      val fs = path.getFileSystem(hadoopConfiguration)
      fs.mkdirs(path)
      fs.getFileStatus(path).getPath.toString
    }
  }

代码清单5-68         将检查点写入HDFS

  def writeToFile[T: ClassTag](
      path: String,
      broadcastedConf: Broadcast[SerializableWritable[Configuration]],
      blockSize: Int = -1
    )(ctx: TaskContext, iterator: Iterator[T]) {
    val env = SparkEnv.get
    val outputDir = new Path(path)
    val fs = outputDir.getFileSystem(broadcastedConf.value.value)

    val finalOutputName = splitIdToFile(ctx.partitionId)
    val finalOutputPath = new Path(outputDir, finalOutputName)
    val tempOutputPath = new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptId)

    if (fs.exists(tempOutputPath)) {
      throw new IOException("Checkpoint failed: temporary path " +
        tempOutputPath + " already exists")
    }
    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)

    val fileOutputStream = if (blockSize < 0) {
      fs.create(tempOutputPath, false, bufferSize)
    } else {
      // This is mainly for testing purpose
      fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
    }
    val serializer = env.serializer.newInstance()
    val serializeStream = serializer.serializeStream(fileOutputStream)
    serializeStream.writeAll(iterator)
    serializeStream.close()

    if (!fs.rename(tempOutputPath, finalOutputPath)) {
      if (!fs.exists(finalOutputPath)) {
        logInfo("Deleting tempOutputPath " + tempOutputPath)
        fs.delete(tempOutputPath, false)
        throw new IOException("Checkpoint failed: failed to save output of task: "
          + ctx.attemptId + " and final output path does not exist")
      } else {
        // Some other copy of this task must‘ve finished before us and renamed it
        logInfo("Final output path " + finalOutputPath + " already exists; not overwriting it")
        fs.delete(tempOutputPath, false)
      }
    }
  }

代码清单5-69         清除RDD的依赖与分区
  private[spark] def markCheckpointed(checkpointRDD: RDD[_]) {
    clearDependencies()
    partitions_ = null
    deps = null    // Forget the constructor argument for dependencies too
  }

  protected def clearDependencies() {
    dependencies_ = null
  }

使用检查点


前两个小节分别讲解了检查点如何启用以及启用后如何实现保存的原理和分析,在5.7节一开始介绍了检查点的两种使用场景:

1)        获取RDD的依赖时,如果有了检查点,则从检查点中读取;

2)        获取RDD的分区时,如果有了检查点,则从检查点中读取。

除了以上两种场景,还有一种场景会间接使用RDD的检查点数据,那就是在计算过程中调用RDD的computeOrReadCheckpoint方法(见代码清单5-70)以便直接从检查点读取保存的计算结果,关于此方法的具体使用放在第6章的分析代码清单6-1时介绍,此处只分析其使用检查点的实现。

代码清单5-70         从检查点读取计算结果

  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  {
    if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
  }
这里的isCheckpointed实际是一个方法,代码如下。

  def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)

isCheckpointed实际代理了RDDCheckpointData的isCheckpointed方法(见代码清单5-71),用于判断当前RDD是否已经设置了检查点。

代码清单5-71         判断RDD是否已经保持了检查点

  def isCheckpointed: Boolean = {
    RDDCheckpointData.synchronized { cpState == Checkpointed }
  }

根据之前的分析,我们知道如果已经保存了检查点,那么cpState必然等于Checkpointed,所以isCheckpointed方法将返回true。因此代码清单5-70将会继续执行firstParent[T].iterator(split, context)。而firstParent(见代码清单5-13)首先会调用代码清单5-28所示的dependencies方法,这样计算过程中调用computeOrReadCheckpoint,使用检查点的过程实际退化为我们说的获取RDD依赖时使用检查点的方式。而此时的依赖已经被CheckpointRDD所替代,经过迭代计算(请参考第6章),最终会调用CheckpointRDD的compute方法(见代码清单5-72),从其实现可知从检查点读取计算结果实际就是读取之前分析的写入HDFS的数据。

代码清单5-72         从HDFS保存的检查点读取数据

  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
    val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
    CheckpointRDD.readFromFile(file, broadcastedConf, context)
  }

以上是关于Spark的RDD检查点实现分析的主要内容,如果未能解决你的问题,请参考以下文章

RDD之七:Spark容错机制

Spark GraphX图计算代码实现,源码分析

Tachyon 默认情况下是不是由 Apache Spark 中的 RDD 实现?

10. spark源代码分析(基于yarn cluster模式)- 聊聊RDD和Depedency

第七篇:Spark SQL 源码分析之Physical Plan 到 RDD的具体实现

Spark-序列化依赖关系持久化