RDD的cachepersistcheckpoint的区别和StorageLevel存储级别划分

Posted renyang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RDD的cachepersistcheckpoint的区别和StorageLevel存储级别划分相关的知识,希望对你有一定的参考价值。

为了增强容错性和高可用,避免上游RDD被重复计算的大量时间开销,Spark RDD设计了包含多种存储级别的缓存和持久化机制,主要有三个概念:Cache、Persist、Checkout。

1、存储级别介绍(StorageLevel)

存储级别以一个枚举类StorageLevel定义,分为以下12种:

StorageLevel枚举类存储级别
存储级别 使用空间 CPU时间 是否在内存中 是否在磁盘上 备注
NONE
不使用任何存储
DISK_ONLY
只存在磁盘上
DISK_ONLY_2
数据存2份
MEMORY_ONLY
只存在内存中
MEMORY_ONLY_2
数据存2份
MEMORY_ONLY_SER
数据序列化存储
MEMORY_ONLY_SER_2
数据序列化后存2份
MEMORY_AND_DISK
中等 部分 部分 若数据在内存中放不下,就溢出写到磁盘上
MEMORY_AND_DISK_2
中等 部分 部分 数据存2份
MEMORY_AND_DISK_SER
部分 部分 数据序列化后,先存内存,内存放不下就溢写到磁盘
MEMORY_AND_DISK_SER_2
部分 部分 数据存2份
OFF_HEAP
       

使用JVM堆外内存,利用java unsafe API实现的内存管理

  • 优点:在内存有限时,可以减少频繁GC及不必要的内存消耗(减少内存的使用,),提升程序性能。
  • 缺点:没有数据备份,也不能像alluxio那样保证数据高可用,丢失数据则需要重新计算。

 

2、Cache 和 Persist 的区别

  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def cache(): this.type = persist()

上面是RDD的cache和persist的源代码,可以看出,cache方法本质上调用了空参数的persist方法,而空参数的persist方法又调用了“MEMORY_ONLY”参数的persist方法,也就是说,cache是MEMORY_ONLY级别的缓存存储,是一个特殊的persist。

3、Persist

persist方法提供了丰富的存储级别,可以满足多样性的缓存需求

  /**
   * Mark this RDD for persisting using the specified level.
   *
   * @param newLevel the target storage level
   * @param allowOverride whether to override any existing level with the new one
   */
  private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
    // TODO: Handle changes of StorageLevel
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of an RDD after it was already assigned a level")
    }
    // If this is the first time this RDD is marked for persisting, register it
    // with the SparkContext for cleanups and accounting. Do this only once.
    if (storageLevel == StorageLevel.NONE) {
      sc.cleaner.foreach(_.registerRDDForCleanup(this))
      sc.persistRDD(this)
    }
    storageLevel = newLevel
    this
  }

  /**
   * Set this RDD‘s storage level to persist its values across operations after the first time
   * it is computed. This can only be used to assign a new storage level if the RDD does not
   * have a storage level set yet. Local checkpointing is an exception.
   */
  def persist(newLevel: StorageLevel): this.type = {
    if (isLocallyCheckpointed) {
      // This means the user previously called localCheckpoint(), which should have already
      // marked this RDD for persisting. Here we should override the old storage level with
      // one that is explicitly requested by the user (after adapting it to use disk).
      persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
    } else {
      persist(newLevel, allowOverride = false)
    }
  }

  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

 persist方法包含三个实现,但可以看出,空参数的persist其实调用了单参数的persist方法,单参数的persist方法又调用了双参数的persist方法,在双参数persist中排除了一种情况,之后按照最新存储级别执行存储,存储流程的细节以后再分析。

3、Checkpoint和Persist的区别

  Persist Checkpoint
位置 persist和cache只能保存在本地的磁盘和内存中(或者堆外内存) 数据必须保存在HDFS分布式文件系统中
生命周期 cache和persist的RDD会在程序结束后被清除或者可以手动调用unpersist清除 由于在HDFS上,程序结束结束后依然存在,不会被删除
RDD血统和依赖链

persist和cache会保留RDD的血统和依赖关系,原因是这两种持久化方式是不可靠的。

如果出现Executor宕机等故障,那么持久化的数据就会丢失,那么修复后可以回溯血统重新计算RDD

HDFS天然支持高可靠存储,即checkpoint的持久化就是绝对可靠的,

所以checkpoint会折断依赖链,不需要回溯

额外Job

persist和cache有RDD血统链,不需要开启额外Job执行操作

checkpoint会通过sc.runJob()开启一个额外Job来执行RDD写入HDFS的操作

 

 

综上,可以得出,cache是一个特殊的persist,persist是保留RDD血统的不可靠持久化方式,checkpoint是安全可靠、不保留RDD血统的持久化方式; 如果不考虑OOM等异常可能性大小,且单论性能的话,Cache > Persist > Checkpoint

以上是关于RDD的cachepersistcheckpoint的区别和StorageLevel存储级别划分的主要内容,如果未能解决你的问题,请参考以下文章

50.性能调优之重构RDD架构以及RDD持久化

spark 教程一 RDD和核心概念

Spark中将一个RDD严格划分为多个RDD

从另一个 rdd 中搜索 rdd 的值

RDD算子RDD依赖关系

如何将三个 RDD 加入一个元组?