Update:Spark原理_运行过程_高级特性
Posted mediocreworld
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Update:Spark原理_运行过程_高级特性相关的知识,希望对你有一定的参考价值。
如何判断宽窄依赖:
===================================
6. Spark 底层逻辑
-
从部署图了解
Spark
部署了什么, 有什么组件运行在集群中 -
通过对
WordCount
案例的解剖, 来理解执行逻辑计划的生成 -
通过对逻辑执行计划的细化, 理解如何生成物理计划
如无特殊说明, 以下部分均针对于 |
- 部署情况
- 案例
- 逻辑执行图
- 物理执行图
6.1. 逻辑执行图生成
-
如何生成 RDD
-
如何控制 RDD 之间的关系
6.1.1. RDD 的生成
本章要回答如下三个问题
-
如何生成 RDD
-
生成什么 RDD
-
如何计算 RDD 中的数据
val sc = ...
val textRDD = sc.parallelize(Seq("Hadoop Spark", "Hadoop Flume", "Spark Sqoop"))
val splitRDD = textRDD.flatMap(_.split(" "))
val tupleRDD = splitRDD.map((_, 1))
val reduceRDD = tupleRDD.reduceByKey(_ + _)
val strRDD = reduceRDD.map(item => s"${item._1}, ${item._2}")
println(strRDD.toDebugString)
strRDD.collect.foreach(item => println(item))
- 明确逻辑计划的边界
textFile
算子的背后map
算子的背后flatMap
算子的背后textRDD
→splitRDD
→tupleRDD
- 如何生成
RDD
? - 生成哪些
RDD
? - 如何计算
RDD
中的数据 ?
6.1.2. RDD 之间的依赖关系
-
讨论什么是 RDD 之间的依赖关系
-
继而讨论 RDD 分区之间的关系
-
最后确定 RDD 之间的依赖关系分类
-
完善案例的逻辑关系图
- 什么是
RDD
之间的依赖关系? reduceByKey
算子会生成ShuffledRDD
- 整体上的流程图
-
6.1.3. RDD 之间的依赖关系详解
上个小节通过例子演示了 RDD 的分区间的关系有两种形式
-
一对一, 一般是直接转换
-
多对一, 一般是 Shuffle
本小节会说明如下问题:
-
如果分区间得关系是一对一或者多对一, 那么这种情况下的 RDD 之间的关系的正式命名是什么呢?
-
RDD 之间的依赖关系, 具体有几种情况呢?
- 窄依赖
- 宽依赖
- 如何分辨宽窄依赖 ?
-
RDD 的逻辑图本质上是对于计算过程的表达, 例如数据从哪来, 经历了哪些步骤的计算
-
每一个步骤都对应一个 RDD, 因为数据处理的情况不同, RDD 之间的依赖关系又分为窄依赖和宽依赖 *
6.1.4. 常见的窄依赖类型
常见的窄依赖其实也是有分类的, 而且宽窄以来不太容易分辨, 所以通过本章, 帮助同学明确窄依赖的类型
- 一对一窄依赖
- Range 窄依赖
- 多对一窄依赖
- 再谈宽窄依赖的区别
6.2. 物理执行图生成
-
物理图的意义
-
如何划分 Task
-
如何划分 Stage
- 物理图的作用是什么?
- 谁来计算 RDD ?
- 问题三: Task 该如何设计 ?
- 如何划分阶段 ?
- 数据怎么流动 ?
6.3. 调度过程
-
生成逻辑图和物理图的系统组件
-
Job
和Stage
,Task
之间的关系 -
如何调度
Job
- 逻辑图
- 物理图
Job
是什么 ?Job
和Stage
的关系Stage
和Task
的关系- TaskSet
6.3. Shuffle 过程
本章节重点是介绍 Shuffle
的流程, 因为根据 ShuffleWriter
的实现不同, 其过程也不同, 所以前半部分根据默认的存储引擎 SortShuffleWriter
来讲解
后半部分简要介绍一下其它的 ShuffleWriter
Shuffle
过程的组件结构- 有哪些
ShuffleWriter
? SortShuffleWriter
的执行过程
7. RDD 的分布式共享变量
-
理解闭包以及 Spark 分布式运行代码的根本原理
-
理解累加变量的使用场景
-
理解广播的使用场景
-
闭包就是一个封闭的作用域, 也是一个对象
-
Spark 算子所接受的函数, 本质上是一个闭包, 因为其需要封闭作用域, 并且序列化自身和依赖, 分发到不同的节点中运行
7.1. 累加器
- 一个小问题
-
var count = 0 val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]") val sc = new SparkContext(config) sc.parallelize(Seq(1, 2, 3, 4, 5)) .foreach(count += _) println(count)
上面这段代码是一个非常错误的使用, 请不要仿照, 这段代码只是为了证明一些事情
先明确两件事,
var count = 0
是在 Driver 中定义的,foreach(count += _)
这个算子以及传递进去的闭包运行在 Executor 中这段代码整体想做的事情是累加一个变量, 但是这段代码的写法却做不到这件事, 原因也很简单, 因为具体的算子是闭包, 被分发给不同的节点运行, 所以这个闭包中累加的并不是 Driver 中的这个变量
- 全局累加器
-
Accumulators(累加器) 是一个只支持
added
(添加) 的分布式变量, 可以在分布式环境下保持一致性, 并且能够做到高效的并发.原生 Spark 支持数值型的累加器, 可以用于实现计数或者求和, 开发者也可以使用自定义累加器以实现更高级的需求
val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]") val sc = new SparkContext(config) val counter = sc.longAccumulator("counter") sc.parallelize(Seq(1, 2, 3, 4, 5)) .foreach(counter.add(_)) // 运行结果: 15 println(counter.value)
注意点:
-
Accumulator 是支持并发并行的, 在任何地方都可以通过
add
来修改数值, 无论是 Driver 还是 Executor -
只能在 Driver 中才能调用
value
来获取数值
在 WebUI 中关于 Job 部分也可以看到 Accumulator 的信息, 以及其运行的情况
累计器件还有两个小特性, 第一, 累加器能保证在 Spark 任务出现问题被重启的时候不会出现重复计算. 第二, 累加器只有在 Action 执行的时候才会被触发.
val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]") val sc = new SparkContext(config) val counter = sc.longAccumulator("counter") sc.parallelize(Seq(1, 2, 3, 4, 5)) .map(counter.add(_)) // 这个地方不是 Action, 而是一个 Transformation // 运行结果是 0 println(counter.value)
-
- 自定义累加器
-
开发者可以通过自定义累加器来实现更多类型的累加器, 累加器的作用远远不只是累加, 比如可以实现一个累加器, 用于向里面添加一些运行信息
class InfoAccumulator extends AccumulatorV2[String, Set[String]] { private val infos: mutable.Set[String] = mutable.Set() override def isZero: Boolean = { infos.isEmpty } override def copy(): AccumulatorV2[String, Set[String]] = { val newAccumulator = new InfoAccumulator() infos.synchronized { newAccumulator.infos ++= infos } newAccumulator } override def reset(): Unit = { infos.clear() } override def add(v: String): Unit = { infos += v } override def merge(other: AccumulatorV2[String, Set[String]]): Unit = { infos ++= other.value } override def value: Set[String] = { infos.toSet } } @Test def accumulator2(): Unit = { val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]") val sc = new SparkContext(config) val infoAccumulator = new InfoAccumulator() sc.register(infoAccumulator, "infos") sc.parallelize(Seq("1", "2", "3")) .foreach(item => infoAccumulator.add(item)) // 运行结果: Set(3, 1, 2) println(infoAccumulator.value) sc.stop() }
注意点:
-
可以通过继承
AccumulatorV2
来创建新的累加器 -
有几个方法需要重写
-
reset 方法用于把累加器重置为 0
-
add 方法用于把其它值添加到累加器中
-
merge 方法用于指定如何合并其他的累加器
-
-
value
需要返回一个不可变的集合, 因为不能因为外部的修改而影响自身的值
-
7.2. 广播变量
-
理解为什么需要广播变量, 以及其应用场景
-
能够通过代码使用广播变量
- 广播变量的作用
- 广播变量的API
- 广播变量的使用场景
- 扩展
-
广播变量用于将变量缓存在集群中的机器中, 避免机器内的 Executors 多次使用网络拉取数据
-
广播变量的使用步骤: (1) 创建 (2) 在 Task 中获取值 (3) 销毁
以上是关于Update:Spark原理_运行过程_高级特性的主要内容,如果未能解决你的问题,请参考以下文章
Vue原理解析(五):彻底搞懂虚拟Dom到真实Dom的生成过程
大数据技术之_16_Scala学习_11_客户信息管理系统+并发编程模型 Akka+Akka 网络编程-小黄鸡客服案例+Akka 网络编程-Spark Master Worker 进程通讯项目(示例代