Spark-RDD
Posted sanxiandoupi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark-RDD相关的知识,希望对你有一定的参考价值。
RDD(Resilient Distributed Datasets)弹性分布式数据集,是在集群应用中分享数据的一种高效,通用,容错的抽象,是Spark提供的最重要的抽象的概念,它是一种有容错机制的特殊集合,可以分布在集群的节点上,以函数式编操作集合的方式,进行各种并行操作。
RDD是只读的,不可变的数据集。RDD也是容错的,假如其中一个RDD坏掉,RDD中有记录之前的依赖关系,依赖关系中记录算子和分区,可以重新生成。RDD实现分布式数据集容错方法有两种:数据检查点和记录更新。同时RDD是高效的,不需要物化。它也是分区记录的集合,可以缓存的。
每个RDD都包含有一组RDD分区(partition),数据集的原子组成部分,还有对父RDD的一组依赖,这些依赖描述了RDD的Lineage;以及一个函数,说明在父RDD上执行何种计算;还包含元数据,描述分区模式和数据存放的位置。
RDD依赖
RDD之间的依赖关系分为宽依赖和窄依赖两类。对于窄依赖,子RDD的每个分区依赖于常数个父分区,它与数据规模无关。输入输出是一对一的算子,但是其中一种方式的结果RDD的分区结构不变,主要是map,flatMap。但是如union,coalesce结果RDD的分区结构会发生变化。对于宽依赖,子RDD的每个分区都依赖于所有的父RDD分区。
对于两种依赖关系,窄依赖允许在一个集群节点上以流水线的方式(pipeline)计算所有父分区。而宽依赖则需要首先计算好所有父分区数据,然后在节点之间进行Shuffle。窄依赖能够更有效地进行失效节点的恢复,重新计算丢失RDD分区的父分区,而且不同节点之间可以并行计算;而对于一个宽依赖关系的Lineage图,单个节点失效可能导致这个RDD的所有祖先丢失部分分区,因而需要整体重新计算。
RDD特征
同时RDD有五个特征,其中分区,一系列的依赖关系和函数是三个基本特征,最佳位置和分区策略是可选。RDD是移动计算而不是移动数据。
RDD和spark之间,RDD是一种具有容错性基于内存的集群计算抽象方法,Spark则是这个抽象方法的实现。
RDD与Mapreduce
Hadoop Mapreduce
1.不适于大量的迭代
2.不适于交互式查询
基于数据流方式不能复用曾经的结果或中间计算结果
RDD
1.自动进行内存,磁盘切换
2.高效容错
3.Task失败,会进行特定次数的重试
4.Stage失败,会自动进行特定次数的重试
5.只会计算失败的分片
6.checkpoint persist
7.数据分配的高度弹性
RDD为何高效
RDD是不可变集合类型,且操作为lazy
RDD的写操作为粗粒度,读操作可以是粗粒度也可为细粒度
所有的RDD操作都返回迭代器,可以让框架集成
创建RDD
1.集合创建
程序中一个已有的集合传给SparkContext的parallelize()
1 | Scala创建 |
2.本地文件系统
1 | Scala |
3.HDFS
1 | Scala |
4.基于数据库 SQL NOSQL
5.基于S3
6.基于数据流
RDD操作
转化操作
返回新的RDD,RDD的转化操作都是惰性求值,事实上为回溯求值操作
map() flatMap() filter() distinct() sample()
union() intersection() subtract() cartesian()
filter
大专栏 Spark-RDDe">1 | JavaRDD filter = parallelize.filter(new Function<Integer, Boolean>() { |
foreach
1 | filter.foreach(new VoidFunction(){ |
flatMap()
1 | JavaRDD<String> flatMap = mapToPair.flatMap(new FlatMapFunction<Tuple2<Integer,String>,String>(){ |
Map()
1 | JavaPairRDD<Integer, String> mapToPair = textFile.mapToPair(new PairFunction<String,Integer,String>(){ |
行动操作
不产生RDD
collect() count() countByValue() take(num) top(num) reduce(func) foreach(func)
controller
persist cache checkpoint
persist适用条件
某步计算耗时,计算链条过长,checkpoint所在的RDD必须持久化(触发job),shuffle之后,shuffle之前(系统默认)
1 | val cached=sc.textFile("path").flatMap(_.split(" ")).map(word=>(word,1)).reduceBykey(_+_).cache |
汇总
1.reduce为Action操作,不产生RDD,看源码中有无runjob;
每一个shuffle产生一个新的RDD,触发stage
2.并行度问题
并行=分区=task
Spark的并行度看内存使用,看CPU,与数据规模无关
并行度=文件大小/128
3.RDD上的一系列数据分片的计算逻辑相同
以上是关于Spark-RDD的主要内容,如果未能解决你的问题,请参考以下文章