Spark复习-RDD
Posted 张小小凡
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark复习-RDD相关的知识,希望对你有一定的参考价值。
1.RDD的介绍
RDD叫做弹性分布式数据集,但是他本身并不保存数据。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
-
弹性 -
存储的弹性:内存与磁盘的自动切换 -
容错的弹性:数据丢失可以自动恢复 -
计算的弹性:计算出错重试机制 -
分片的弹性:可根据需要重新分片 -
分布式:数据存储在大数据集群不同的节点上 -
数据集:RDD封装了计算逻辑,不包含数据 -
可分区并行计算
2.核心属性
-
分区列表:RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。 -
分区计算函数:对每个分区进行相同的逻辑处理 -
RDD之间的依赖关系也叫血缘关系,这个非常重要后续再详细介绍 -
分区器(可选) -
首选位置(可选)
2.1分区器
类似于kafka中的分区,如图中把数据分为1,2 和3,4。也可以分成1,3和2,4。如果数据是kv形的,可以根据k来进行分区。需要自己自定义
2.2首选位置
如图封装好的Task1为什么要发给Executor1为什么不发给Executor2呢。这里有一个首选位置的规则。因为我们读取的数据是分布式的存在各个节点上的。如果1,2数据在Executor所在的节点上,那么把task1发给Executor1就是最优的选择。省去了把数据发给Executor2所在节点产生的网络io的消耗。
当然某些情况下也会把task1发给Executor2。比如Executor1所在节点没有运行资源了,被别的任务占Task1会等待一段时间,如果超时会进行第二优选择,发给离Executor1所在节点拓扑距离最近的节点执行。
RDD读取数据处理的流程。不管是textfile读取文件,还是sparksql读取数据库,还是sparkstreaming读取流式数据,最后都会转换成RDD执行。
3.RDD数据切分的规则
3.1 rdd从内存中创建时
源码:
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
python实现:
a=[1,2,3,4,5]
def slice(list,numSlices):
result=[]
length=len(list)# 数据集的个数
for i in range(0,numSlices):
start=int(i*length/numSlices)
end=int((i+1)*length/numSlices)
result.append((start,end))
return result
ret=slice(a,3)
结果为:[(0, 1), (1, 3), (3, 5)]
3.2 rdd从文件中创建时
贴一部分代码吧:
long blockSize = file.getBlockSize();
long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining;
String[][] splitHosts;
for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap);
splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1]));
}
if (bytesRemaining != 0L) {
splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap);
splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1]));
}
当我们使用sc.textFile创建rdd时,底层调用的其实是hadoop读取文件的形式。
首先计算文件的总字节数size,用size/我们希望的分区数,当不能整除时。如果余数<平均分区字节数*0.1时,不会再增加分区直接合并到最后一个分区,这样做是防止产生小文件。当>0.1时,会增加一个分区。所以实际的分区数可能是我们指定的分区数+1。
hadoop读取文件是一行一行读的,并且索引不会重复读取。所以会出现有的分区没有数据的情况。
4.总结
下回分享RDD的各种算子,也是开发中最重要的部分。
转载美三代 点赞富一生
以上是关于Spark复习-RDD的主要内容,如果未能解决你的问题,请参考以下文章