RDD的分区依赖关系机制

Posted 发量不足

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RDD的分区依赖关系机制相关的知识,希望对你有一定的参考价值。

目录

一.RDD的分区

二.RDD的依赖关系

三.RDD机制

1、使用persist()方法对RDD进行持久化

​编辑

2、使用cache()方法对RDD进行持久化

​编辑

三、容错机制


一.RDD的分区

RDD的分区原则是分区的个数尽量等于集群中的CPU核心(Core)数目。

各种模式下的默认分区数目如下

(1) Local模式:默认为本地机器的CPU数目,若设置了local[N].则默认为N.

(2) Standalone或者Yarn模式:在“集群中所有CPU核数总和"和“2”这两者中取较大值作为默认值。

(3) Mesos 模式:默认的分区数是8.

Spark框架为RDD提供了两种分区方式,分别是哈希分区(HashPartitioner)和范围分区(RangePartitioner)。

Spark也支持自定义分区方式,即通过一个自定义的Partitioner对象来控制RDD的分区,从而进一步减少通信开销。

如果想要实现自定义分区,就需要定义一个类,使得这个自定义的类继承org. apache. spark. Partitioner类,并实现其中的3个方法,具体如下:

(1) def numPartitions:Int:用于返回创建的分区个数。

(2) def getPartition(Key:Any):用于对输人的Key做处理,并返回该Key的分区ID, 分区ID的范围是0~ numPartitions 1.

(3) equals (other: Any):用于Spark判断自定义的Partitioner 对象和其他的 Partitioner对象是否相同,从而判断两个RDD的分区方式是否相同。

二.RDD的依赖关系

RDD之间具有依赖的关系。

RDD与它所依赖的RDD的依赖关系有两种类型,分别是

窄依赖(narrow dependency)宽依赖(wide dependency)

窄依赖是指父RDD的每一个分区最多被一个子RDD的分区使用,即OneToOneDependencies。窄依赖的表现一般分为两类:第一类表现为一个父RDD的分区对应于一个子RDD的分区;第二类表现为多个父RDD的分区对应于一个子RDD的分区,也就是说,一个父RDD的一个分区不可能对应一个子RDD的多个分区。

当RDD执行map、filter、union、和join操作时,都会产生窄依赖。

宽依赖是指子RDD的每一个分区都会使用所有父RDD的所有分区或多个分区,即One ToManyDependecies。(当RDD进行groupByKey和join操作时,会产生宽依赖)

join算子操作既可以属于窄依赖,也可以属于宽依赖。当join算子操作后,分区数量没有变化则为窄依赖(如join with inputs co partitioned,输人协同划分);当join算子操作后,分区数量发生变化则为宽依赖(如join with inputs not corpartitioned,输人非协同划分)。

三.RDD机制

Spark为RDD提供了两个重要的机制,分别是特久化机制(即缓存机制)和容错机制

RDD是采用惰性求值(即每次调用行动算子操作,都会从头开始计算)

RDD的持久化操作有两种方法,分别是cache()方法和persist()方法。

①persist()方法的存储级别是通过StorageLevel对象(Scala Java、Python)设置的。

②cache()方法的存储级别是使用默认的存储级别(即StorageLevel. MEMORY ONLY(将反序列化的对象存人内存))。

                 持久化RDD的存储级别

MEMORY_ ONLY

默认存储级别。将RDD作为反序列化的Java对象,缓存到JVM中,若内存放不下(内存已满情况),则某些分区将不会被缓存,并且每次需要时都会重新计算

MEMORY_AND_DISK

将RDD作为反序列化的Java对象,缓存到JVM中,若内存放不下(内存已满情况),则将剩余分区存储到磁盘上,并在需要时从磁盘读取

MEMORY_ONLY_SER

将RDD作为序列化的Java对象(每个分区序列化为一个字节数组),比反序列化的Java对象节省空间,但读取时更占CPU

MEMORY_AND_DISK_SER

与MEMORY ONLY SER类似,但是当内存放不下时则溢出到磁盘,而不是每次需要时重新计算它们

DISK_ONLY

仅将RDD分区全部存储到磁盘上

MEMORY_ONLY_2 MEMORY_AND_DISK_2

与上面的级别相同。若加上后缀2,代表的是将每个持久化的数据都复制一份副本,并将副本保存到其他节点上

OFF_HEAP(实验性)

与MEMORY ONLY SER类似,但将数据存储在堆外内存中(这需要启用堆外内存)

1、使用persist()方法对RDD进行持久化

定义一个列表list,通过该列表创建一个RDD,然后通过persist持久化操作和算子操作统计RDD中的元素个数以及打印输出RDD中所有的元素。

代码:

import org.apache.spark.storage.StorageLevel

val testlist = List("hadoop","Python","Spark","Java")

val listRDD = sc.parallelize(testlist)

listRDD.persist(StorageLevel.DISK_ONLY)

println(listRDD.count())

println(listRDD.collect().mkString(","))

 

2、使用cache()方法对RDD进行持久化

代码:

val list = List("hadoop","Python","Spark","Java")

val listRDD = sc.parallelize(list)

listRDD.cache()

println(listRDD.count())

println(listRDD.collect().mkString(","))

 

三、容错机制

RDD提供了两种故障恢复的方式,分别是血统(lineage)方式设置检查点(checkpoint)方式

①血统方式,主要是根据RDD之间的依赖关系对丢失数据的RDD进行数据恢复。

②设置检查点方式,本质上是将RDD写人磁盘进行存储。

以上是关于RDD的分区依赖关系机制的主要内容,如果未能解决你的问题,请参考以下文章

Spark宽依赖窄依赖

RDD的三个机制

Spark shuffle 机制

窄依赖与宽依赖&stage的划分依据

spark 中划分stage的思路

Spark(10)——Spark的Stage如何划分