Spark复习-RDD

Posted 张小小凡

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark复习-RDD相关的知识,希望对你有一定的参考价值。

1.RDD的介绍

RDD叫做弹性分布式数据集,但是他本身并不保存数据。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

  • 弹性
    • 存储的弹性:内存与磁盘的自动切换
    • 容错的弹性:数据丢失可以自动恢复
    • 计算的弹性:计算出错重试机制
    • 分片的弹性:可根据需要重新分片
  • 分布式:数据存储在大数据集群不同的节点上
  • 数据集:RDD封装了计算逻辑,不包含数据
  • 可分区并行计算

2.核心属性

  • 分区列表:RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。
  • 分区计算函数:对每个分区进行相同的逻辑处理
  • RDD之间的依赖关系也叫血缘关系,这个非常重要后续再详细介绍
  • 分区器(可选)
  • 首选位置(可选)

2.1分区器

类似于kafka中的分区,如图中把数据分为1,2 和3,4。也可以分成1,3和2,4。如果数据是kv形的,可以根据k来进行分区。需要自己自定义

2.2首选位置

如图封装好的Task1为什么要发给Executor1为什么不发给Executor2呢。这里有一个首选位置的规则。因为我们读取的数据是分布式的存在各个节点上的。如果1,2数据在Executor所在的节点上,那么把task1发给Executor1就是最优的选择。省去了把数据发给Executor2所在节点产生的网络io的消耗。

当然某些情况下也会把task1发给Executor2。比如Executor1所在节点没有运行资源了,被别的任务占Task1会等待一段时间,如果超时会进行第二优选择,发给离Executor1所在节点拓扑距离最近的节点执行。

RDD读取数据处理的流程。不管是textfile读取文件,还是sparksql读取数据库,还是sparkstreaming读取流式数据,最后都会转换成RDD执行。

3.RDD数据切分的规则

3.1 rdd从内存中创建时

源码:

    def positions(length: Long, numSlices: Int): Iterator[(IntInt)] = {
      (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
      }
    }

python实现:

a=[1,2,3,4,5]

def slice(list,numSlices):
    result=[]
    length=len(list)# 数据集的个数
    
    for i in range(0,numSlices):
        start=int(i*length/numSlices)
        end=int((i+1)*length/numSlices)
        result.append((start,end))
    return result

ret=slice(a,3)
结果为:[(01), (13), (35)]

3.2 rdd从文件中创建时

贴一部分代码吧:

long blockSize = file.getBlockSize();
long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining;
String[][] splitHosts;
for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
    splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap);
    splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1]));
}
if (bytesRemaining != 0L) {
    splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap);
    splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1]));
}

当我们使用sc.textFile创建rdd时,底层调用的其实是hadoop读取文件的形式。

首先计算文件的总字节数size,用size/我们希望的分区数,当不能整除时。如果余数<平均分区字节数*0.1时,不会再增加分区直接合并到最后一个分区,这样做是防止产生小文件。当>0.1时,会增加一个分区。所以实际的分区数可能是我们指定的分区数+1。

hadoop读取文件是一行一行读的,并且索引不会重复读取。所以会出现有的分区没有数据的情况。

4.总结

下回分享RDD的各种算子,也是开发中最重要的部分。

转载美三代  点赞富一生



以上是关于Spark复习-RDD的主要内容,如果未能解决你的问题,请参考以下文章

Spark闭包与序列化

Spark编程实战-词频统计

Spark编程实战-词频统计

RDD 与Spark 生产代码的数据集

有spark大佬知道下面这个代码哪里可以优化吗?

值 toDS 不是 org.apache.spark.rdd.RDD 的成员