Spark之RDD
Posted miranda-wu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark之RDD相关的知识,希望对你有一定的参考价值。
一、什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
1.1 RDD属性
(1)Partition
即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的集群的CPU Core的总数目。
(2)一个计算每个分区的函数。
Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
(3)RDD之间的依赖关系。
RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
(4)一个Partitioner,即RDD的分片函数。控制分区数和分区策略
当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
(5)每个数据分区的地址列表
存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
1.2 RDD特点
RDD是一种分布式内存抽象。RDD限制应用执行批量写,这样有利于实现有效的容错。RDD可以使用Lineage恢复分区,基本没有检查点开销,失败时只需要重新计算丢失的RDD分区,就可以在不同节点上并行执行,不需要roll back这个程序。
RDD还支持备份任务,来处理落后的任务(运行很慢的节点)
RDD的批量操作是根据数据存放的位置来调度任务,尽量减少数据的传输,从而提高性能
RDD的扫描操作,如果内存不足以缓存整个RDD,就进行部分缓存,将容乃不下的分区存储大磁盘RDD支持粗粒度和细粒度的读操作。RDD上的很多函数操作(如count和collect等)都是批量读操作,即扫描整个数据集,可以将任务分配到距离数据最近的节点上。同时,RDD也支持细粒度操作,即在哈希或范围分区的RDD上执行关键字查找。
1.3 WordCount粗图解RDD
二、RDD的创建方式
2.1 通过读取文件生成
val file= sc.textFile("/spark/hello.txt")
2.2 通过并行化的方式创建
scala> val array = Array(1,2,3,4,5) array: Array[Int] = Array(1, 2, 3, 4, 5) scala> val rdd = sc.parallelize(array) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:26 scala>
2.3 其他
读取数据库、或者通过其他RDD转换而来
三、RDD算子
算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。
运行:在Spark数据输入形成(这里可能并不准确)RDD后便可以通过变换算子,如fliter等,对数据进行操作并将RDD转化为新的RDD,通过Action算子,触发Spark提交作业。如果数据需要复用,可以通过Cache算子,将数据缓存到内存。
RDD支持两类算子:
3.1 Transformation算子
主要做的是就是将一个已有的RDD生成另外一个RDD。Transformation具有lazy特性(延迟加载)。Transformation算子的代码不会真正被执行。只有当我们的程序里面遇到一个action算子的时候,代码才会真正的被执行。这种设计让Spark更加有效率地运行。在Transformations算子中再将数据类型维度细分为:Value数据类型和Key-Value对数据类型的Transformations算子。Value型数据的算子封装在RDD类中可以直接使用,Key-Value对数据类型的算子封装于PairRDDFunctions类中,用户需要引入importorg.apache.spark.SparkContext._才能够使用。进行这样的细分是由于不同的数据类型处理思想不太一样,同时有些算子是不同的。
常用的Transformation算子:
转换算子 | 说明 |
map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
3.2 Action算子
本质上在Action算子中通过SparkContext执行提交作业的runJob操作,触发了RDDDAG的执行。
常用的Action算子:
Action算子 | 说明 |
reduce(func) | 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
foreach() | 在数据集的每一个元素上,运行函数func进行更新 |
take() | 返回一个由数据集的前n个元素组成的数组 |
四、例程
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SparkWordCountWithScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() /** * 如果这个参数不设置,默认认为你运行的是集群模式 * 如果设置成local代表运行的是local模式 */ conf.setMaster("local") //设置任务名 conf.setAppName("WordCount") //创建SparkCore的程序入口 val sc = new SparkContext(conf) //读取文件 生成RDD val file: RDD[String] = sc.textFile("E:\\hello.txt") //把每一行数据按照,分割 val word: RDD[String] = file.flatMap(_.split(",")) //让每一个单词都出现一次 val wordOne: RDD[(String, Int)] = word.map((_,1)) //单词计数 val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_+_) //按照单词出现的次数 降序排序 val sortRdd: RDD[(String, Int)] = wordCount.sortBy(tuple => tuple._2,false) //将最终的结果进行保存 sortRdd.saveAsTextFile("E:\\result") sc.stop() }
五、宽依赖、窄依赖
5.1 RDD依赖
由于RDD是粗粒度的操作数据集,每个Transformation操作都会生成一个新的RDD,所以RDD之间就会形成类似流水线的前后依赖关系;RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。如图所示显示了RDD之间的依赖关系。
从图中可知:
窄依赖:是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map、filter、union等操作都会产生窄依赖;(独生子女)
宽依赖:是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖;(超生)
需要特别说明的是对join操作有两种情况:
(1)图中左半部分join:如果两个RDD在进行join操作时,一个RDD的partition仅仅和另一个RDD中已知个数的Partition进行join,那么这种类型的join操作就是窄依赖,例如图1中左半部分的join操作(join with inputs co-partitioned);
(2)图中右半部分join:其它情况的join操作就是宽依赖,例如图1中右半部分的join操作(join with inputs not co-partitioned),由于是需要父RDD的所有partition进行join的转换,这就涉及到了shuffle,因此这种类型的join操作也是宽依赖。
5.2 RDD 依赖关系下的数据流
在spark中,会根据RDD之间的依赖关系将DAG图(有向无环图)划分为不同的阶段,对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,窄依赖就被spark划分到同一个stage中,而对于宽依赖,只能等父RDD shuffle处理完成后,下一个stage才能开始接下来的计算。
因此spark划分stage的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中。因此在图2中RDD C,RDD D,RDD E,RDDF被构建在一个stage中,RDD A被构建在一个单独的Stage中,而RDD B和RDD G又被构建在同一个stage中。
在spark中,Task的类型分为2种:ShuffleMapTask和ResultTask;
简单来说,DAG的最后一个阶段会为每个结果的partition生成一个ResultTask,即每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition的数量所决定的!而其余所有阶段都会生成ShuffleMapTask;之所以称之为ShuffleMapTask是因为它需要将自己的计算结果通过shuffle到下一个stage中;也就是说上图中的stage1和stage2相当于mapreduce中的Mapper,而ResultTask所代表的stage3就相当于mapreduce中的reducer。
在之前动手操作了一个wordcount程序,因此可知,Hadoop中MapReduce操作中的Mapper和Reducer在spark中的基本等量算子是map和reduceByKey;不过区别在于:Hadoop中的MapReduce天生就是排序的;而reduceByKey只是根据Key进行reduce,但spark除了这两个算子还有其他的算子;因此从这个意义上来说,Spark比Hadoop的计算算子更为丰富。
以上是关于Spark之RDD的主要内容,如果未能解决你的问题,请参考以下文章