Spark-RDD

Posted sanxiandoupi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark-RDD相关的知识,希望对你有一定的参考价值。

RDD(Resilient Distributed Datasets)弹性分布式数据集,是在集群应用中分享数据的一种高效,通用,容错的抽象,是Spark提供的最重要的抽象的概念,它是一种有容错机制的特殊集合,可以分布在集群的节点上,以函数式编操作集合的方式,进行各种并行操作。

RDD是只读的,不可变的数据集。RDD也是容错的,假如其中一个RDD坏掉,RDD中有记录之前的依赖关系,依赖关系中记录算子和分区,可以重新生成。RDD实现分布式数据集容错方法有两种:数据检查点和记录更新。同时RDD是高效的,不需要物化。它也是分区记录的集合,可以缓存的。
每个RDD都包含有一组RDD分区(partition),数据集的原子组成部分,还有对父RDD的一组依赖,这些依赖描述了RDD的Lineage;以及一个函数,说明在父RDD上执行何种计算;还包含元数据,描述分区模式和数据存放的位置。

RDD依赖

RDD之间的依赖关系分为宽依赖和窄依赖两类。对于窄依赖,子RDD的每个分区依赖于常数个父分区,它与数据规模无关。输入输出是一对一的算子,但是其中一种方式的结果RDD的分区结构不变,主要是map,flatMap。但是如union,coalesce结果RDD的分区结构会发生变化。对于宽依赖,子RDD的每个分区都依赖于所有的父RDD分区。
对于两种依赖关系,窄依赖允许在一个集群节点上以流水线的方式(pipeline)计算所有父分区。而宽依赖则需要首先计算好所有父分区数据,然后在节点之间进行Shuffle。窄依赖能够更有效地进行失效节点的恢复,重新计算丢失RDD分区的父分区,而且不同节点之间可以并行计算;而对于一个宽依赖关系的Lineage图,单个节点失效可能导致这个RDD的所有祖先丢失部分分区,因而需要整体重新计算。

RDD特征

同时RDD有五个特征,其中分区,一系列的依赖关系和函数是三个基本特征,最佳位置和分区策略是可选。RDD是移动计算而不是移动数据。
RDD和spark之间,RDD是一种具有容错性基于内存的集群计算抽象方法,Spark则是这个抽象方法的实现。

RDD与Mapreduce

Hadoop Mapreduce
1.不适于大量的迭代
2.不适于交互式查询
基于数据流方式不能复用曾经的结果或中间计算结果
RDD
1.自动进行内存,磁盘切换
2.高效容错
3.Task失败,会进行特定次数的重试
4.Stage失败,会自动进行特定次数的重试
5.只会计算失败的分片
6.checkpoint persist
7.数据分配的高度弹性

RDD为何高效

RDD是不可变集合类型,且操作为lazy
RDD的写操作为粗粒度,读操作可以是粗粒度也可为细粒度
所有的RDD操作都返回迭代器,可以让框架集成

创建RDD

1.集合创建
程序中一个已有的集合传给SparkContext的parallelize()

1
2
3
4
5
Scala创建
val lines=sc.parallelize(List("padas","i like padas"))

Java创建
JavaRDD<String> lines=sc.parallelize(Arrays.asList("padas","i like padas"))

2.本地文件系统

1
2
3
4
5
Scala 
val input=sc.textFile("本地路径")

Java
JavaRDD<String> input=sc.textFile("本地路径"")

3.HDFS
1
2
3
4
5
Scala 
val input=sc.textFile("HDFS路径")

Java
JavaRDD<String> input=sc.textFile("HDFS路径"")

4.基于数据库 SQL NOSQL
5.基于S3
6.基于数据流

RDD操作

转化操作

返回新的RDD,RDD的转化操作都是惰性求值,事实上为回溯求值操作
map() flatMap() filter() distinct() sample()
union() intersection() subtract() cartesian()
filter

大专栏  Spark-RDDe">1
2
3
4
5
6
7
8
9
JavaRDD filter = parallelize.filter(new Function<Integer, Boolean>() {


public Boolean (Integer arg0) throws Exception {

return arg0>4;
}

});

foreach

1
2
3
4
5
6
7
8
9
10
filter.foreach(new VoidFunction(){


public void (Object arg0) throws Exception {

System.out.println(arg0);
}


});

flatMap()

1
2
3
4
5
6
7
8
9
10
JavaRDD<String> flatMap = mapToPair.flatMap(new FlatMapFunction<Tuple2<Integer,String>,String>(){


public Iterable<String> (Tuple2<Integer, String> arg0) throws Exception {

return Arrays.asList(arg0._2.split(" "));
}


});

Map()

1
2
3
4
5
6
7
8
9
10
JavaPairRDD<Integer, String> mapToPair = textFile.mapToPair(new PairFunction<String,Integer,String>(){


public Tuple2<Integer, String> (String arg0) throws Exception {

return new Tuple2(1,arg0);
}


});

行动操作

不产生RDD
collect() count() countByValue() take(num) top(num) reduce(func) foreach(func)

controller

persist cache checkpoint
persist适用条件
某步计算耗时,计算链条过长,checkpoint所在的RDD必须持久化(触发job),shuffle之后,shuffle之前(系统默认)

1
2
3
val cached=sc.textFile("path").flatMap(_.split(" ")).map(word=>(word,1)).reduceBykey(_+_).cache
cached.count
速度明显提高,但注意cache后别放其他算子,否则每次重新cache

汇总

1.reduce为Action操作,不产生RDD,看源码中有无runjob;
每一个shuffle产生一个新的RDD,触发stage
2.并行度问题
并行=分区=task
Spark的并行度看内存使用,看CPU,与数据规模无关
并行度=文件大小/128
3.RDD上的一系列数据分片的计算逻辑相同

以上是关于Spark-RDD的主要内容,如果未能解决你的问题,请参考以下文章

Spark-RDD 模型 以及运行原理

Spark-RDD

Spark-RDD 转换算子(Value 类型)

Spark-RDD/DataFrame/DateSet

微信小程序代码片段

VSCode自定义代码片段——CSS选择器