RDD缓存与检查点

Posted chxyshaodiao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RDD缓存与检查点相关的知识,希望对你有一定的参考价值。

RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。 但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD的计算结果将会被缓存在计算节点的内存中,并供后面重用。

示例如下:

def main(args: Array[String]): Unit = {
    val sc: SparkContext = new SparkContext(new SparkConf().
      setMaster("local[*]").setAppName("spark"))

    val raw: RDD[String] = sc.makeRDD(Array("hello"))
    val current: RDD[String] = raw.map(_.toString + System.currentTimeMillis())
    //current.persist()
    current.collect().foreach(println)
    current.collect().foreach(println)
    current.collect().foreach(println)
  }

如果不加persist方法,执行结果如下:

hello1582190762213

hello1582190762463

hello1582190762526

加了persist方法之后,执行结果变为:

hello1582190869308

hello1582190869308

hello1582190869308

可见:current这个rdd的计算结果被缓存起来了 ,下游的rdd直接从缓存拿数据并进行运算。rdd及其rdd上游的计算过程被省略了,从而加快了计算过程。

 

存储级别:

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)

 

堆外内存:区别于JVM内存。这一块内存不受JVM的GC回收机制的影响,而是直接向操作系统申请并自主管理的一块内存空间。为什么需要这块内存:JVM中的内存由于受到GC的影响,如果没有用完可能迟迟得不到释放,这时候如果再加入数据,就可能导致OOM问题。而如果由自己来管理内存,可以更及时地释放内存。

 

检查点

技术图片 

如图所示:

依赖链过长,会导致有大量的血统信息要被记录;

而且在进行数据恢复的时候,要重新从头开始计算,比较耗时;

因此引入了检查点:

血统信息会从检查点开始记录;

重新计算时,把检查点的数据作为元数据开始计算;

相当于是检查点之前的RDD链条被掐断,检查点作为新的RDD链条头。

示例代码如下:

def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setAppName("wordcount").setMaster("local[*]")

    val sc: SparkContext = new SparkContext(conf)
    sc.setCheckpointDir("cp")

    val lines: RDD[String] = sc.parallelize(Array(("hello,spark"),("hello,scala"),("hello,world")))

    val words: RDD[String] = lines.flatMap(_.split(" "))

    val wordToOne: RDD[(String, Int)] = words.map((_,1))
//wordToOne之前的血缘关系,会被检查点替代。
    //wordToOne.checkpoint()

    val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)

    wordToSum.collect()

    println(wordToSum.toDebugString)

  }

不加checkpoint的打印结果:

(8) ShuffledRDD[3] at reduceByKey at checkPoint.scala:21 []
 +-(8) MapPartitionsRDD[2] at map at checkPoint.scala:17 []
    |  MapPartitionsRDD[1] at flatMap at checkPoint.scala:15 []
    |  ParallelCollectionRDD[0] at parallelize at checkPoint.scala:13 []

从头开始记录;

加了checkpoint之后的打印结果:

(8) ShuffledRDD[3] at reduceByKey at checkPoint.scala:21 []
 +-(8) MapPartitionsRDD[2] at map at checkPoint.scala:17 []
    |  ReliableCheckpointRDD[4] at collect at checkPoint.scala:23 []

检查点所在RDD之前的RDD的血缘信息被检查点信息所替代。

 

接下来考察检查点之前的RDD会不会被重复计算:

def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setAppName("wordcount").setMaster("local[*]")

    val sc: SparkContext = new SparkContext(conf)
    sc.setCheckpointDir("cp")

    val lines: RDD[String] = sc.parallelize(Array(System.currentTimeMillis().toString))

    val words: RDD[String] = lines.flatMap(_.split(" "))

    val wordToOne: RDD[(String, Int)] = words.map((_,1))
//wordToOne之前的血缘关系,会被检查点替代。
    wordToOne.checkpoint()

    val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)

    wordToSum.collect().foreach(println)
    wordToSum.collect().foreach(println)
    wordToSum.collect().foreach(println)

  }

打印结果如下:

(1582193859704,1)

(1582193859704,1)

(1582193859704,1)

证明检查点之间的RDD没有被重复计算。

 

以上是关于RDD缓存与检查点的主要内容,如果未能解决你的问题,请参考以下文章

火花检查点比缓存更快吗?

数据框架到RDD这段代码无法工作。

Spark的RDD检查点实现分析

Spark基础学习笔记21:RDD检查点与共享变量

执行顺序和缓存需求

你应该总是在中间计数之前缓存一个 RDD 吗?