Spark RDD 分布式弹性数据集

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark RDD 分布式弹性数据集相关的知识,希望对你有一定的参考价值。

参考技术A

rdd是一种弹性分布式的数据集,它代表着不可变的数据元素,可以被分区并行处理。

rdd是一个粗粒度的数据生成方式和流转迭代计算方式的描述。它可以通过稳定的存储器或者从其他RDD生成,它并不需要急着进行转换,只需要在特定的rdd进行一次性的数据的迭代流转。rdd记录着自己的依赖关系,以防在数据丢失时可以通过“血缘”关系再次生成数据。用户也可以自己选择在经常重用的rdd进行数据落地,放置丢失后重做。

rdd的特性总结:

分布式的共享内存是一种细粒度的读写,可以对每个存储单元进行读写,其一致性需要程序进行维护,其容错性需要设置检查点和程序回滚。但是RDD由于是不可变的粗粒度的读写,更适合于批量读写的任务,其可以使用“血缘”机制恢复数据,减少了设置检查点的开销。如果出现失败时,也只用重新计算分区中丢失的那一部分。另一方面,RDD的不可变性可以让系统可以像mapreduce一样采用后备任务的方式来代替运行缓慢的任务,不会出现相互影响的情况。

另外rdd也吸取了分布式共享内存的特性,rdd的批量操作可以根据数据所处的位置进行优化,提高性能。加载数据时,当内存不足时,rdd的性能下降是平稳的,不能载入内存的分区可以存储在磁盘上。

上面的5点是rdd都会实现的接口,这也是rdd都具有的特性。

如上源码所示,RDD提供了分区的抽象函数,即protected def getPartitions: Array[Partition],每个继承RDD抽象类的RDD都会有自己的getPartitions的实现。RDD分区的多少代表着计算时的并发粒度。

用户可以自己指定执行的分区数,如果用户不自己指定,则使用默认的分区数。

从源码中可以看出,如果不传入分区数,则默认分区数为defaultParallelism,而defaultParallelism=math.max(totalCoreCount.get(), 2)所以最小是2,最大是主机核数。

HadoopRDD是读取hdfs文件的rdd。HadoopRDD使用的是MapReduce API。

spark.sparkContext.textFile(" hdfs://user/local/admin.text ") 中textFile是读取hdfs文件的方法。其中会调用HadoopRDD。

textFile 是从HDFS分布式文件系统的所有节点上读取数据,返回Strings的RDD。

总结下HadoopRDD分区规则:

1.如果textFile指定分区数量为0或者1的话,defaultMinPartitions值为1,则有多少个文件,就会有多少个分区。

2.如果不指定默认分区数量,则默认分区数量为2,则会根据所有文件字节大小totalSize除以分区数量,得到的值goalSize,然后比较goalSize和hdfs指定分块大小(这里是128M)作比较,以较小的最为goalSize作为切分大小,对每个文件进行切分,若文件大于大于goalSize,则会生成该(文件大小/goalSize)个分区,如果文件内的数据不能除尽则分区数会+1,则为(fileSize/goalSize)+1。

3.如果指定分区数量大于等于2,则默认分区数量为指定值,生成实际分区数量规则任然同2中的规则一致。

总之:文件总大小除以分区数,大于分块大小,则与分块大小相关,否则以得到的商相关。

rdd优先位置返回的是每一个分区的位置信息,按照移动计算的思路,将计算尽量分配到数据所在的机器上。

RDD的操作是粗粒度的操作,RDD进行转换会形成新的RDD。形成的RDD和原RDD形成依赖关系,RDD通过这种“血缘”关系来维护数据的容错性。RDD的依赖关系可以分为宽依赖和窄依赖两种。

从中可以看出mapRDD是OneToOneDependency依赖,其父RDD为ParallelCollectionRDD。

从中可以看出groupRDD的依赖是ShuffleDependency依赖,其父依赖是MapPartitionsRDD。而groupbykey是需要进行shuffle的算子,属于宽依赖。

Spark通过创建的类来表明,RDD间的依赖关系的类型,NarrowDependency属于窄依赖,ShuffleDenpendency属于宽依赖。之后会通过一节来具体介绍其中的细节。

从上面的RDD源码可以发现,每个RDD中都存在一个compute()的函数,这个函数的作用就是为实现RDD具体的分区计算。

def compute(split: Partition, context: TaskContext): Iterator[T]

compute的返回值是分区的迭代器,每一个分区都会调用这个函数。只有到action算子才会真正的执行计算。

partitioner指的是Spark的分区函数,目前最常用的有两种,HashPartitioner和RangePartitioner, 其次还有缩减分区数的分区函数CoalescedPartitioner。分区这个概念,只存在于(K,V)键值对的RDD中,非键值对的RDD中partitioner为None。

分区函数即决定了RDD本身分区的数量,也决定了Shuffle中MapOut输出中每个分区进行切割的依据。

HashPartitioner会对数据的key进行 key.hascode%numpartitions 计算,得到的数值会放到对应的分区中,这样能较为平衡的分配数据到partition。

RangePartitioner:它是在排序算子中会用到的分区器,比如sortbykey、sortby、orderby等。该分区器先对输入的数据的key做采样,来估算Key的分布,然后按照指定的排序切分range,尽量让每个partition对应的range里的key分布均匀。

rdd中的算子可以分为两种,一个是transformation, 一个是action算子。

1. Transformation:转换算子,这类转换并不触发提交作业,完成作业中间过程处理。

2. Action:行动算子,这类算子会触发SparkContext提交Job作业。

Spark弹性分布式数据集RDD

     RDD(Resilient Distributed Dataset)是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现。RDD必须是可序列化的。RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。这对于迭代运算比较常见的机器学习算法, 交互式数据挖掘来说,效率提升比较大。

   RDD的理解可以简化为:

  • 数据集:故名思议RDD是数据集合的抽象,从外部来看,RDD的确可以被看待成经过封装,带扩展特性(如容错性)的数据集合; 
  • 分布式:数据的计算并非只局限于单个节点,而是多个节点之间协同计算是到; 
  • 弹性:在计算处理过程中,机器的内存不够时,它会和硬盘进行数据交换,某种程度上会减低性能,但是可以确保计算得以继续进行

   可以把RDD理解为Scala集合的分布式版本,Scala集合位于单个JVM,而RDD通过分区分布在多个JVM,这些JVM可能跨越不同的物理机器节点。下图是RDD的一个示意图:

一、编程模型

     在Spark中,RDD被表示为对象,通过这些对象上的方法(或函数)调用转换。

     定义RDD之后,程序员就可以在动作(注:即action操作)中使用RDD了。动作是向应用程序返回值,或向存储系统导出数据的那些操作,例如,count(返回RDD中的元素个数),collect(返回元素本身),save(将RDD输出到存储系统)。在Spark中,只有在动作第一次使用RDD时,才会计算RDD(即延迟计算)。这样在构建RDD的时候,运行时通过管道的方式传输多个转换。

    程序员还可以从两个方面控制RDD,即缓存和分区。用户可以请求将RDD缓存,这样运行时将已经计算好的RDD分区存储起来,以加速后期的重用。缓存的RDD一般存储在内存中,但如果内存不够,可以写到磁盘上。

    另一方面,RDD还允许用户根据关键字(key)指定分区顺序,这是一个可选的功能。目前支持哈希分区和范围分区。例如,应用程序请求将两个RDD按照同样的哈希分区方式进行分区(将同一机器上具有相同关键字的记录放在一个分区),以加速它们之间的join操作。在Pregel和HaLoop中,多次迭代之间采用一致性的分区置换策略进行优化,我们同样也允许用户指定这种优化。

  

     对于RDD,有两种类型的动作,一种是Transformation,一种是Action。它们本质区别是:

  • Transformation返回值还是一个RDD。它使用了链式调用的设计模式,对一个RDD进行计算后,变换成另外一个RDD,然后这个RDD又可以进行另外一次转换。这个过程是分布式的
  • Action返回值不是一个RDD。它要么是一个Scala的普通集合,要么是一个值,要么是空,最终或返回到Driver程序,或把RDD写入到文件系统中

     Transformations转换操作,返回值还是一个 RDD,如 map、 filter、 union
     Actions行动操作,返回结果或把RDD持久化起来,如 count、 collect、 save

A=>B表示以=>操作符左边的部分作为输入,对其执行一个函数,并以=>操作符右边代码的执行结果作为输出

参见:http://blog.jasonding.top/2015/07/08/Spark/【Spark】弹性分布式数据集RDD
 
         http://www.jianshu.com/p/0b75ddd66999https://www.zybuluo.com/BrandonLin/note/448121

以上是关于Spark RDD 分布式弹性数据集的主要内容,如果未能解决你的问题,请参考以下文章

Spark弹性分布式数据集RDD概述

Spark弹性分布式数据集RDD

Spark RDD 分布式弹性数据集

spark RDD(弹性分布式数据集)可以更新吗?

Spark弹性分布式数据集(Resilient Distributed Dataset)

Spark的核心RDD(Resilient Distributed Datasets弹性分布式数据集)