Spark 内核泛指
Spark
的核心运行机制,包括
Spark
核心组件的运行机制、
Spark
任务调度机制、Spark
内存管理机制、
Spark
核心功能的运行原理等
一、部署模式
Spark 支持多种集群管理器(Cluster Manager),分别为:
1) Standalone
:独立模式,
Spark
原生的简单集群管理器,自带完整的服务,可单独部署到
一个集群中,无需依赖任何其他资源管理系统,使用
Standalone
可以很方便地搭建一个
集群;
2) Hadoop YARN
:统一的资源管理机制,在上面可以运行多套计算框架,如
MR
、
Storm
等。根据
Driver
在集群中的位置不同,分为
yarn client
(集群外)和
yarn cluster
(集群
内部)
3) Apache Mesos
:一个强大的分布式资源管理框架,它允许多种不同的框架部署在其上,
包括
Yarn
。
4) K8S :
容器式部署环境。
1、YARN 模式运行机制
(1)YARN Cluster 模式
1)
执行脚本提交任务,实际是启动一个
SparkSubmit
的
JVM
进程;
2) SparkSubmit
类中的
main
方法反射调用
YarnClusterApplication
的
main
方法;
3) YarnClusterApplication
创建
Yarn
客户端,然后向
Yarn
服务器发送执行指令:
bin/java
ApplicationMaster
;
4) Yarn
框架收到指令后会在指定的
NM
中启动
ApplicationMaster
;
5) ApplicationMaster
启动
Driver
线程
,执行用户的作业;
6) AM
向
RM
注册,申请资源;
7)
获取资源后
AM
向
NM
发送指令:
bin/java
Yarn
CoarseGrainedExecutorBackend
;
8) CoarseGrainedExecutorBackend
进程会接收消息,跟
Driver
通信,注册已经启动的
Executor
;然后启动计算对象
Executor
等待接收任务
9) Driver
线程继续执行完成作业的调度和任务的执行。
10) Driver
分配任务并监控任务的执行。
注意:
SparkSubmit
、
ApplicationMaster
和
CoarseGrainedExecutorBackend
是独立的进程;
Driver 是独立的线程;Executor
和
YarnClusterApplication
是对象。
(2)YARN Client 模式
1)
执行脚本提交任务,实际是启动一个
SparkSubmit
的
JVM
进程;
2) SparkSubmit
类中的
main
方法反射调用用户代码的
main
方法;
3)
启动
Driver
线程,执行用户的作业,并创建
ScheduleBackend
;
4) YarnClientSchedulerBackend
向
RM
发送指令:
bin/java ExecutorLauncher
;
5) Yarn
框架收到指令后会在指定的
NM
中启动
ExecutorLauncher
(实际上还是调用 ApplicationMaster 的
main
方法);
6) AM
向
RM
注册,申请资源;
7)
获取资源后
AM
向
NM
发送指令:
bin/java CoarseGrainedExecutorBackend
;
8) CoarseGrainedExecutorBackend
进程会接收消息,跟
Driver
通信,注册已经启动的
Executor
;然后启动计算对象
Executor
等待接收任务
9) Driver
分配任务并监控任务的执行。
注意:
SparkSubmit
、
ApplicationMaster
和
YarnCoarseGrainedExecutorBackend
是独立的进
程;
Executor
和
Driver
是对象。
二、通信架构
Netty:通信框架,AIO
- BIO:阻塞式IO
- NIO:非阻塞式IO
- AIO:异步非阻塞式IO
核心概念表示:
- RPCEnv:通信环境
- Backend:后台
- Endpoint:终端
Linux对AIO支持不够好,Windows支持好。Linux采用Epoll方式模仿AIO操作。简单的通信框架如图1所示:
完整的通信框架如图2所示:
三、任务调度机制
在生产环境下,Spark
集群的部署方式一般为
YARN-Cluster
模式,之后的内核分析内容
中我们默认集群的部署方式为
YARN-Cluster
模式。
Driver具体的工作流程:
Driver
线程 主 要 是 初 始 化 SparkContext
对 象,准备运行所需的上下文 , 然后一方面保 持与ApplicationMaster的
RPC
连接,通过
ApplicationMaster
申请资源,另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲Executor
上。
当 ResourceManager
向
ApplicationMaster
返回
Container
资源时,
ApplicationMaster
就尝
试在对应的
Container
上启动
Executor
进程,
Executor
进程起来后,会向
Driver
反向注册,
注册成功后保持与
Driver
的心跳,同时等待
Driver
分发任务,当分发的任务执行完毕后,
将任务状态上报给
Driver
。
其核心对象如下:
1、任务调度概述
当 Driver
起来后,
Driver
则会根据用户程序逻辑准备任务,并根据
Executor
资源情况
逐步分发任务。在详细阐述任务调度前,首先说明下
Spark
里的几个概念。一个
Spark
应用
程序包括
Job
、
Stage
以及
Task
三个概念:
1) Job
是以
Action
方法为界,遇到一个
Action
方法则触发一个
Job
;
2) Stage
是
Job
的子集,以
RDD
宽依赖
(
即
Shuffle)
为界,遇到
Shuffle
做一次划分;
3) Task
是
Stage
的子集,以并行度
(
分区数
)
来衡量,分区数是多少,则有多少个
task
。
Spark 的任务调度总体来说分两路进行,一路是
Stage
级的调度,一路是
Task
级的调度,总
体调度流程如下图所示:
Spark RDD 通过其
Transactions
操作,形成了
RDD
血缘(依赖)关系图,即
DAG
,最
后通过
Action
的调用,触发
Job
并调度执行,执行过程中会创建两个调度器:
DAGScheduler
和
TaskScheduler
。
- DAGScheduler 负责 Stage 级的调度,主要是将 job 切分成若干 Stages,并将每个 Stage打包成 TaskSet 交给 TaskScheduler 调度。
- TaskScheduler 负责 Task 级的调度,将 DAGScheduler 给过来的 TaskSet 按照指定的调度策略分发到 Executor 上执行,调度过程中 SchedulerBackend 负责提供可用资源,其中SchedulerBackend 有多种实现,分别对接不同的资源管理系统。
Driver 初始化 SparkContext 过程中,会分别初始化 DAGScheduler、TaskScheduler、 SchedulerBackend 以及 HeartbeatReceiver,并启动 SchedulerBackend 以及 HeartbeatReceiver。 SchedulerBackend 通过 ApplicationMaster 申请资源,并不断从 TaskScheduler 中拿到合适的 Task 分发到 Executor 执行。HeartbeatReceiver 负责接收 Executor 的心跳信息,监控Executor的存活状况,并通知到 TaskScheduler。
2、Stage 级调度
Spark Task 的调度是由
TaskScheduler
来完成,由前文可知,
DAGScheduler
将
Stage
打
包到交给
TaskScheTaskSetduler
,
TaskScheduler
会将
TaskSet
封装为
TaskSetManager
加入到
调度队列中,
TaskSetManager
结构如下图所示。
3、Task 级调度
Spark Task 的调度是由 TaskScheduler
来完成,由前文可知,
DAGScheduler
将
Stage
打
包到交给
TaskScheTaskSetduler
,
TaskScheduler
会将
TaskSet
封装为
TaskSetManager
加入到
调度队列中,
TaskSetManager
结构如下图所示。
TaskSetManager 负 责监控 管理 同一 个
Stage
中的
Tasks
,
TaskScheduler
就是以
TaskSetManager
为单元来调度任务。
前面也提到,TaskScheduler
初始化后会启动
SchedulerBackend
,它负责跟外界打交道,
接收
Executor
的注册信息,并维护
Executor
的状态,所以说
SchedulerBackend
是管
“
粮食
”
的,同时它在启动后会定期地去
“
询问
”TaskScheduler
有没有任务要运行,也就是说,它会定
期地
“
问
”TaskScheduler“
我有这么余粮,你要不要啊
”
,
TaskScheduler
在
SchedulerBackend“
问
”
它的时候,会从调度队列中按照指定的调度策略选择
TaskSetManager
去调度运行
(1)调度策略
TaskScheduler 支持两种调度策略,
一种是
FIFO
,也是默认的调度策略,另一种是
FAIR
。
在
TaskScheduler
初始化过程中会实例化
rootPool
,表示树的根节点,是
Pool
类型。
1) FIFO
调度策略
如果是采用 FIFO
调度策略,则直接简单地将
TaskSetManager
按照先来先到的方式入
队,出队时直接拿出最先进队的
TaskSetManager
,其树结构如下图所示,
TaskSetManager
保
存在一个
FIFO
队列中。
2) FAIR 调度策略
FAIR 调度策略的树结构如下图所示:
FAIR 模式中有一个
rootPool
和多个子
Pool
,各个子
Pool
中存储着所有待分配的
TaskSetMagager。
在 FAIR
模式中,需要先对子
Pool
进行排序,再对子
Pool
里面的
TaskSetMagager
进行
排序,因为Pool
和
TaskSetMagager
都继承了
Schedulable
特质,因此使用相同的排序算法。
(2)本地化调度
计算和数据的位置存在不同的级别,这个级别称之为本地化级别,分为四种:
- 进程本地化:数据和计算在同一个进程中
- 节点本地化:数据和计算在同一个节点中
- 机架本地化:数据和计算在同一个机架中
- 任意
在调度执行时,Spark
调度总是会尽量让每个
task
以最高的本地性级别来启动,当一个
task
以
X
本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败,
此时并不会马上降低本地性级别启动而是在某个时间长度内再次以
X
本地性级别来启动该
task
,若超过限时时间则降级启动,去尝试下一个本地性级别,依次类推。
可以通过调大每个类别的最大容忍延迟时间,在等待阶段对应的 Executor
可能就会有
相应的资源去执行此
task
,这就在在一定程度上提到了运行性能。
(3)失败重试与黑名单机制
对于失败的 Task
,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的 Task
池子中,否则整个
Application
失败。
在记录 Task
失败次数过程中,会记录它上一次失败所在的
Executor Id
和
Host
,这样下次再调度这个Task
时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到一定的容错作用。黑名单记录Task
上一次失败所在的
Executor Id
和
Host
,以及其对应的“拉黑”时间,“拉黑”时间是指这段时间内不要再往这个节点上调度这个Task
了。
四、Shuffle解析
1、未优化的 HashShuffle
2、优化后的 HashShuffle
3、普通 SortShuffle
在该模式下,数据会先写入一个数据结构,reduceByKey 写入 Map,一边通过 Map 局 部聚合,一遍写入内存。Join 算子写入 ArrayList 直接写入内存中。然后需要判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。
在溢写磁盘前,先根据 key
进行排序,排序过后的数据,会分批写入到磁盘文件中。默
认批次为
10000
条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的
方式,每次溢写都会产生一个磁盘文件,
也就是说一个
Task
过程会产生多个临时文件。
最后在每个 Task
中,将所有的临时文件合并,这就是
merge
过程,此过程将所有临时
文件读取出来,一次写入到最终文件。
意味着一个
Task
的所有数据都在这一个文件中。同
时单独写一份索引文件,标识下游各个
Task
的数据在文件中的索引,
start offset
和
end offset
。
4、bypass SortShuffle
而该机制与普通 SortShuffleManager
运行机制的不同在于:不会进行排序。也就是说,
启用该机制的最大好处在于,
shuffle write
过程中,不需要进行数据的排序操作,也就节省
掉了这部分的性能开销。
五、内存管理
Spark3.x以后将采用统一内存管理:
其中最重要的优化在于动态占用机制,其规则如下:
1)
设定基本的存储内存和执行内存区域(
spark.storage.storageFraction
参数),该设定确定了双方各自拥有的空间的范围;
2)
双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;
(存储空间不足是指不足以放下一个完整的
Block
)
3)
执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后
”
归还
”
借用的空间;
4)
存储内存的空间被对方占用后,无法让对方
”
归还
”
,因为需要考虑
Shuffle
过程中的很
多因素,实现起来较为复杂。
统一内存管理的动态占用机制如图所示: