SparkRDD未持久化——持久化
Posted ibigjy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkRDD未持久化——持久化相关的知识,希望对你有一定的参考价值。
RDD的持久化策略:
cache、persist、checkpoint三种策略(持久化的单位是partition)
1、cache是persist的一个简化版,会将rdd中的数据持久化到内存中
cache = persists(StorageLevel.MEMORY_ONLY) 不进行序列化
特点:
cache的返回值 必须赋值给一个新的RDD变量, 在其他的job中直接使用这个RDD变量就可以
cache是一个懒执行(其他两个也是懒执行),必须有action类的算子触发(也就是说,实现缓存要先触发一次)
cache算子的后面不能立即添加action类算子
var RDD= RDD.cache.foreach X 错误的,这样RDD 的结果就是遍历后的
2、persist手动指定持久化级别
3、checkpoint
checkpoint会另启 一个job持久化到HDFS上 (安全性)、依赖关系会被切断。
如果RDD转换很多,可以使用checkpoint。
当使用checkpoint的时候可以先cache一把,然后再用checkpoint,这时候就会从内存中写入HDFS上(要快一些)
未持久化:
package spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class Spark_Persist {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("job_name");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> textFile = sc.textFile("E:/aa.txt");
//未做持久化1
//3123毫秒运行完8643490行
long startTime = System.currentTimeMillis();
long sumLine = textFile.count();
long endtTime = System.currentTimeMillis();
System.out.println((endtTime-startTime)+"毫秒运行完"+sumLine+"行");
//未做持久化2
//2471毫秒运行完8643490行
long startTime1 = System.currentTimeMillis();
long sumLine1 = textFile.count();
long endtTime1 = System.currentTimeMillis();
System.out.println((endtTime1-startTime1)+"毫秒运行完"+sumLine1+"行");
}
}
持久化:
package spark; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; public class Spark_Persist { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("job_name"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD<String> textFile = sc.textFile("E:/aa.txt").cache(); //持久化1 //8382毫秒运行完8643490行 long startTime = System.currentTimeMillis(); long sumLine = textFile.count(); long endtTime = System.currentTimeMillis(); System.out.println((endtTime-startTime)+"毫秒运行完"+sumLine+"行"); //持久化2 //168毫秒运行完8643490行 long startTime1 = System.currentTimeMillis(); long sumLine1 = textFile.count(); long endtTime1 = System.currentTimeMillis(); System.out.println((endtTime1-startTime1)+"毫秒运行完"+sumLine1+"行"); } }
以上是关于SparkRDD未持久化——持久化的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段