Spark大数据之深度理解RDD的内在逻辑(5000字案例干货!)

Posted 传智教育官方博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark大数据之深度理解RDD的内在逻辑(5000字案例干货!)相关的知识,希望对你有一定的参考价值。


一、深入RDD

目标:深入理解RDD的内在逻辑,以及RDD的内部属性(由什么组成)

1. 案例

需求:

给订一个网站的访问记录,俗称Access log
计算机其中出现的独立IP,以及访问次数

val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
val sc = new SparkContext(config)

val result = sc.textFile("dataset/access_log_sample.txt")
  .map(item => (item.split(" ")(0), 1))
  .filter(item => StringUtils.isNotBlank(item._1))
  .reduceByKey((curr, agg) => curr + agg)
  .sortBy(item => item._2, false)
  .take(10)

result.foreach(item => println(item))

针对这个小案例,我们问出互联网关联但是又方向不同的五个问题。

1.1 假设要针对整个网站的历史数据进行梳理,量有1T,如何处理?

放在集群中, 利用集群多台计算机来并行处理

1.2 如何放在集群中运行

在这里插入图片描述
简单来说,并行计算就是同时使用多个计算资源解决一个问题,四个要点如下:
要解决的问题必须可以分解为多个可以并发计算的部分

每个部分要可以在不同处理器上被同时执行

需要一个共享内存的机制

需要一个总体上的协作机制来进行调度

3. 如何放在集群中的话,可能要整个计算任务进行分解,如何分解?

在这里插入图片描述
概述:
对于 HDFS 中的文件, 是分为不同的 Block 的

在进行计算的时候, 就可以按照 Block 来划分, 每一个 Block 对应一个不同的计算单元

扩展
RDD 并没有真实的存放数据, 数据是从 HDFS 中读取的, 在计算的过程中读取即可

RDD 至少是需要可以 分片 的, 因为HDFS中的文件就是分片的, RDD 分片的意义在于表示对源数据集每个分片的计算, RDD 可以分片也意味着 可以并行计算

1.4 移动数据不如移动计算是一个基础的优化,如何做到?

在这里插入图片描述

1.5 在集群中运行,需要很多节点之间的配合,出错概率也高,出错怎么办?

在这里插入图片描述

RDD1 → RDD2 → RDD3 这个过程中, RDD2 出错了, 有两种办法可以解决
缓存 RDD2 的数据, 直接恢复 RDD2, 类似 HDFS 的备份机制

记录 RDD2 的依赖关系, 通过其父级的 RDD 来恢复 RDD2, 这种方式会少很多数据的交互和保存

如何通过父级 RDD 来恢复?
记录 RDD2 的父亲是 RDD1

记录 RDD2 的计算函数, 例如记录 RDD2 = RDD1.map(…​), map(…​) 就是计算函数

当 RDD2 计算出错的时候, 可以通过父级 RDD 和计算函数来恢复 RDD2

1.6 假如任务特别复杂流程长,有很多RDD之间有依赖关系如何优化?

在这里插入图片描述
上面提到了可以使用依赖关系来进行容错, 但是如果依赖关系特别长的时候, 这种方式其实也比较低效, 这个时候就应该使用另外一种方式, 也就是记录数据集的状态

在 Spark 中有两个手段可以做到
缓存
Checkpoint

二、再谈RDD

目标
1.理解RDD为什么会出现
2.理解RDD的主要特点
3.理解RDD的五大属性

2.1 RDD为什么会出现

在 RDD 出现之前, 当时 MapReduce 是比较主流的, 而 MapReduce 如何执行迭代计算的任务呢?
306061ee343d8515ecafbce43bc54bc6

多个 MapReduce 任务之间没有基于内存的数据共享方式, 只能通过磁盘来进行共享

这种方式明显比较低效

RDD 如何解决迭代计算非常低效的问题呢?
在这里插入图片描述

在 Spark 中, 其实最终 Job3 从逻辑上的计算过程是: Job3 = (Job1.map).filter, 整个过程是共享内存的, 而不需要将中间结果存放在可靠的分布式文件系统中

这种方式可以在保证容错的前提下, 提供更多的灵活, 更快的执行速度, RDD 在执行迭代型任务时候的表现可以通过下面代码体现

// 线性回归
val points = sc.textFile(...)
	.map(...)
	.persist(...)
val w = randomValue
for (i <- 1 to 10000) {
    val gradient = points.map(p => p.x * (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y)
    	.reduce(_ + _)
    w -= gradient
}

在这个例子中,大致进行了10000次迭代,如果在MapReduce中实现,可能需要运行很多job,每个job之间都要通过 HDFS 共享结果, 熟快熟慢一窥便知

2.2 RDD的特点

RDD 不仅是数据集, 也是编程模型
RDD 即是一种数据结构, 同时也提供了上层 API, 同时 RDD 的 API 和 Scala 中对集合运算的 API 非常类似, 同样也都是各种算子

在这里插入图片描述

RDD 的算子大致分为两类:

Transformation 转换操作, 例如 map flatMap filter 等

Action 动作操作, 例如 reduce collect show 等

执行 RDD 的时候, 在执行到转换操作的时候, 并不会立刻执行, 直到遇见了 Action 操作, 才会触发真正的执行, 这个特点叫做 惰性求值

RDD 可以分区

在这里插入图片描述

RDD 是只读的

在这里插入图片描述
RDD 是只读的, 不允许任何形式的修改. 虽说不能因为 RDD 和 HDFS 是只读的, 就认为分布式存储系统必须设计为只读的. 但是设计为只读的, 会显著降低问题的复杂度, 因为 RDD 需要可以容错, 可以惰性求值, 可以移动计算, 所以很难支持修改.

RDD2 中可能没有数据, 只是保留了依赖关系和计算函数, 那修改啥?

如果因为支持修改, 而必须保存数据的话, 怎么容错?

如果允许修改, 如何定位要修改的那一行? RDD 的转换是粗粒度的, 也就是说, RDD 并不感知具体每一行在哪.

RDD 是可以容错的

在这里插入图片描述
RDD 的容错有两种方式

保存 RDD 之间的依赖关系, 以及计算函数, 出现错误重新计算

直接将 RDD 的数据存放在外部存储系统, 出现错误直接读取, Checkpoint

2.3 什么叫做弹性分布式数据集

分布式
RDD 支持分区, 可以运行在集群中

弹性

RDD 支持高效的容错

RDD 中的数据即可以缓存在内存中, 也可以缓存在磁盘中, 也可以缓存在外部存储中

数据集

RDD 可以不保存具体数据, 只保留创建自己的必备信息, 例如依赖和计算函数

RDD 也可以缓存起来, 相当于存储具体数据

总结RDD的五大特性

首先整理一下上面所提到的 RDD 所要实现的功能:

  1. RDD 有分区
  2. RDD 要可以通过依赖关系和计算函数进行容错
  3. RDD 要针对数据本地性进行优化
  4. RDD 支持 MapReduce 形式的计算, 所以要能够对数据进行 Shuffled

对于 RDD 来说, 其中应该有什么内容呢? 如果站在 RDD 设计者的角度上, 这个类中, 至少需要什么属性?

  • Partition List 分片列表, 记录 RDD 的分片, 可以在创建 RDD 的时候指定分区数目, 也可以通过算子来生成新的RDD 从而改变分区数目
  • Compute Function 为了实现容错, 需要记录 RDD 之间转换所执行的计算函数
  • RDD Dependencies RDD 之间的依赖关系, 要在 RDD 中记录其上级 RDD 是谁, 从而实现容错和计算
  • Partitioner 为了执行 Shuffled 操作, 必须要有一个函数用来计算数据应该发往哪个分区
  • Preferred Location 优先位置, 为了实现数据本地性操作, 从而移动计算而不是移动存储, 需要记录每个 RDD分区最好应该放置在什么位置

以上是关于Spark大数据之深度理解RDD的内在逻辑(5000字案例干货!)的主要内容,如果未能解决你的问题,请参考以下文章

大数据之Spark:Spark调优之RDD算子调优

计算框架 Spark 基础之 RDD 操作

大数据之Spark:Spark Core

Spark之RDD本质

大数据之Spark:Spark调优之RDD算子调优

Spark快速大数据分析之RDD基础