3天掌握Spark--内核调度详解
Posted 一只楠喃
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了3天掌握Spark--内核调度详解相关的知识,希望对你有一定的参考价值。
Spark之内核调度
内核调度之引例WordCount
Spark的核心是根据RDD来实现的,
Spark Scheduler
则为Spark核心实现的重要一环,其作用就是任务调度。
Spark的任务调度就是如何组织任务去处理RDD中每个分区的数据,根据RDD的依赖关系构建DAG,基于DAG划分Stage,将每个Stage中的任务发到指定节点运行
。
以词频统计WordCount
程序为例,Job执行是DAG图
运行词频统计WordCount,截取4040监控页面上DAG图:
当RDD调用Action函数(Job触发函数)时,产出1个Job,执行Job。
1、将Job中所有RDD按照依赖关系构建图:DAG图(有向无环图)
2、将DAG图划分为Stage阶段,分为2种类型
- ResultStage,对结果RDD进行处理Stage阶段
- ShuffleMapStage,此Stage阶段中最后1个RDD产生Shuffle
3、每个Stage中至少有1个RDD或多个RDD,每个RDD有多个分区,每个分区数据被1个Task处理
内核调度之RDD 依赖
RDD 间存在着血统继承关系,其本质上是
RDD之间的依赖(Dependency)关系
。
[每个RDD记录,如何从父RDD得到的,调用哪个转换函数]
从DAG图上来看,RDD之间依赖关系存在2种类型:
- 窄依赖,2个RDD之间依赖使用有向箭头表示
- 宽依赖,又叫Shuffle 依赖,2个RDD之间依赖使用S曲线有向箭头表示
窄依赖(Narrow Dependency)
定义:
父 RDD 与子 RDD 间的分区是一对一的
,一(父RDD)对一(子RDD)
Shuffle 依赖(宽依赖 Wide Dependency)
定义:父 RDD 中的分区可能会被多个子 RDD 分区使用,[一(父)对多(子)](
内核调度之DAG和Stage
在Spark应用执行时,每个Job执行时(RDD调用Action函数时),依据最后一个RDD(调用Action函数RDD),依据RDD依赖关系,向前推到,构建Job中所有RDD依赖关系图,称之为DAG图。
当构建完成Job DAG图以后,继续从Job最后一个RDD开始,依据RDD之间依赖关系,将DAG图划分为Stage阶段,当RDD之间依赖为Shuffle依赖时,划分一个Stage。
- 对于窄依赖,RDD之间的数据不需要进行Shuffle,多个数据处理可以在同一台机器的内存中完
成,所以窄依赖在Spark中被划分为同一个Stage;
对于宽依赖,由于Shuffle的存在,必须等到父RDD的Shuffle处理完成后,才能开始接下来的计
算,所以会在此处进行Stage的切分。
可以运行词频统计WordCount查看对应DAG图和Stage阶段
把DAG划分成互相依赖的多个Stage,划分依据是RDD之间的宽依赖,Stage是由一组并行的Task组成。
1、Stage切割规则:从后往前,遇到宽依赖就切割Stage。
2、Stage计算模式:pipeline管道计算模式
pipeline只是一种计算思想、模式,来一条数据然后计算一条数据,把所有的逻辑走完,然后落地。
以词频统计WordCount为例:
从HDFS上读取数据,每个Block对应1个分区,当从Block中读取一条数据以后,经过flatMap、map和reduceByKey操作,最后将结果数据写入到本地磁盘中(Shuffle Write)。
block0: hadoop spark spark
|textFile
RDD-0 hadoop spark spark
|flatMap
RDD-1 hadoop\\spark\\spark
|map
RDD-2 (hadoop, 1)(spark, 1)(spark, 1)
|reduceByKey
写入磁盘 hadoop, 1 || spark, 1\\ spark, 1
3、准确的说:一个task处理一串分区的数据,整个计算逻辑全部走完
内核调度之Spark Shuffle
MapReduce框架中Shuffle过程,整体流程图如下:
Spark在DAG调度阶段会将一个Job划分为多个Stage
,上游Stage做map工作,下游Stage做reduce工作,其本质上还是MapReduce计算框架。
Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等。
Spark的Shuffle分为Write和Read两个阶段,分属于两个不同的Stage,前者是Parent Stage的最后一步,后者是Child Stage的第一步。
Stage划分为2种类型:
- 1)、ShuffleMapStage,在Spark 1个Job中,除了最后一个Stage之外,其他所有的Stage都是此类型
- 将Shuffle数据写入到本地磁盘,ShuffleWriter
- 在此Stage中,所有的Task称为:ShuffleMapTask
- 2)、ResultStage,在Spark的1个Job中,最后一个Stage,对结果RDD进行操作
- 会读取前一个Stage中数据,ShuffleReader
- 在此Stage中,所有的Task任务称为ResultTask。
[ShuffleMapTask要进行Shuffle,ResultTask负责返回计算结果,一个Job中只有最后的Stage采用ResultTask,其他的均为ShuffleMapTask。](
内核调度之Job 调度流程
当启动Spark Application的时候,运行MAIN函数,首先创建SparkContext对象(构建DAGScheduler
和TaskScheduler
)。
-
第一点、
DAGScheduler
实例对象- 将每个Job的DAG图划分为Stage,依据RDD之间依赖为宽依赖(产生Shuffle)
-
第二点、
TaskScheduler
实例对象- 调度每个Stage中所有Task:
TaskSet
,发送到Executor上执行 - 每个Stage中会有多个Task,所有Task处理数据不一样(每个分区数据被1个Task处理),但是处理逻辑一样的。
- 将每个Stage中所有Task任务,放在一起称为
TaskSet
。
当RDD调用Action
函数(比如count、saveTextFile或foreachPartition)时,触发一个Job执行,调度中流程如下图所示:
Spark RDD通过其Transactions操作,形成了RDD血缘关系图,即DAG,最后通过Action的调用,触发Job并调度执行。
- 调度每个Stage中所有Task:
-
1)、DAGScheduler负责Stage级的调度,主要是将DAG切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler调度。
-
2)、TaskScheduler负责Task级的调度,将DAGScheduler给过来的TaskSet按照指定的调度策略分发到Executor上执行,调度过程中SchedulerBackend负责提供可用资源,其中
SchedulerBackend
有多种实现,分别对接不同的资源管理系统。 -
Spark的任务调度总体来说分两路进行,一路是Stage级的调度,一路是Task级的调度。
一个Spark应用程序包括Job、Stage及Task:
第一、Job是以Action方法为界,遇到一个Action方法则触发一个Job;
第二、Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分;
第三、Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task。
内核调度之Spark 基本概念
Spark Application运行时,涵盖很多概念,主要如下表格:
内核调度之并行度
在Spark Application运行时,并行度可以从两个方面理解:
- 1)、资源的并行度:由
节点数(executor)和cpu数(core)
决定的 - 2)、数据的并行度:task的数据,
partition大小
Task数目要是core总数的2-3倍为佳
参数spark.defalut.parallelism默认是没有值的,如果设置了值,是在shuffle的过程才会起作用
在实际项目中,运行某个Spark Application应用时,需要设置资源,尤其Executor个数和CPU核数,如何计算?
- 首先确定总的CPU Core核数,依据数据量(原始数据大小)及考虑业务分析中数据量
- 再确定Executor个数,假定每个Executor核数,获取个数
- 最后确定Executor内存大小,[一般情况下,每个Executor内存往往是CPU核数2-3倍](
以上是关于3天掌握Spark--内核调度详解的主要内容,如果未能解决你的问题,请参考以下文章