Spark-数据持久化操作
Posted Mr.zhou_Zxy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark-数据持久化操作相关的知识,希望对你有一定的参考价值。
Spark-数据持久化操作
cache:将数据临时存储在内存中进行数据重用
涉及内用溢出问题,数据不安全
会在血缘关系中添加新的依赖
一旦出现问题,可以重头读取文件
persist:将数据临时存储在磁盘文件中进行数据重用
涉及磁盘IO,性能较低,但是数据安全
如果作业执行完毕,临时保存的数据文件就会丢失
checkpoint:将数据长久的保存在磁盘文件中进行数据重用
涉及磁盘IO,性能较低,但是数据安全
为了提高效率,和cache联合使用
执行过程中,会切断血缘关系,重新建立血缘关系
checkpoint等同于切换数据源
- Code
package com.zxy.spark.Streaming.day005
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
object demo2 {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("demo2").setMaster("local[*]"))
//1.checkpoint
sc.setCheckpointDir("date/")
val list = List("hello word","hello world")
val rdd: RDD[String] = sc.makeRDD(list)
val flatRDD: RDD[String] = rdd.flatMap(_.split("\\\\s+"))
val mapRDD: RDD[(String, Int)] = flatRDD.map(word => {
println("test")
(word, 1)
})
//2.cache
mapRDD.cache()
//3.persist
mapRDD.persist(StorageLevel.MEMORY_ONLY)
//聚合操作
val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
reduceRDD.collect().foreach(println)
println("***********************")
//分组操作
val groupRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()
groupRDD.collect().foreach(println)
}
}
以上是关于Spark-数据持久化操作的主要内容,如果未能解决你的问题,请参考以下文章