Spark-数据持久化操作

Posted Mr.zhou_Zxy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark-数据持久化操作相关的知识,希望对你有一定的参考价值。

Spark-数据持久化操作

cache:将数据临时存储在内存中进行数据重用
涉及内用溢出问题,数据不安全
会在血缘关系中添加新的依赖
一旦出现问题,可以重头读取文件

persist:将数据临时存储在磁盘文件中进行数据重用
涉及磁盘IO,性能较低,但是数据安全
如果作业执行完毕,临时保存的数据文件就会丢失

checkpoint:将数据长久的保存在磁盘文件中进行数据重用
涉及磁盘IO,性能较低,但是数据安全
为了提高效率,和cache联合使用
执行过程中,会切断血缘关系,重新建立血缘关系
checkpoint等同于切换数据源

  • Code
package com.zxy.spark.Streaming.day005

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

object demo2 {
    def main(args: Array[String]): Unit = {
        
        val sc = new SparkContext(new SparkConf().setAppName("demo2").setMaster("local[*]"))
        //1.checkpoint
        sc.setCheckpointDir("date/")
        
        val list = List("hello word","hello world")
        val rdd: RDD[String] = sc.makeRDD(list)
        val flatRDD: RDD[String] = rdd.flatMap(_.split("\\\\s+"))
        val mapRDD: RDD[(String, Int)] = flatRDD.map(word => {
            println("test")
            (word, 1)
        })
        //2.cache
        mapRDD.cache()
        //3.persist
        mapRDD.persist(StorageLevel.MEMORY_ONLY)
        //聚合操作
        val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
        reduceRDD.collect().foreach(println)
        println("***********************")
        //分组操作
        val groupRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()
        groupRDD.collect().foreach(println)
        
    }
}


在这里插入图片描述

以上是关于Spark-数据持久化操作的主要内容,如果未能解决你的问题,请参考以下文章

Spark:如何加速 foreachRDD?

spark中要想保留流的状态怎么处理用哪种方式缓存

生成 Spark 模式代码/持久化和重用模式

Spark调优

片段中的Firebase数据不是持久的,会重新下载

spark的持久化和共享变量