Spark核心机制总结
Posted Icedzzz
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark核心机制总结相关的知识,希望对你有一定的参考价值。
文章目录
1. RDD
-
RDD是spark中的一个最基本的抽象,代表着一个不可变、可分区、可以并行计算的分布式数据集;
-
RDD是一个基本的抽象,是对存储在分布式文件系统上的数据操作进行代理。RDD并不存储需要计算数据,而是一个代理,你对RDD进行的操作,他会在Driver端转换成Task,下发到Executor中,计算分散在集群中的数据。RDD是抽象的,并不存储数据,而是封装记录你对数据的操作。
-
RDD计算是以分区为单位的
-
RDD算子的操作包括两种类型:Transformation和Action;初始创建都是由SparkContext来负责的
-
RDD中的所有转换(Transformation)都是延迟加载(Lazy)的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作(action)时,这些转换才会真正运行。
-
RDD支持缓存操作,由cache实现,cache可以对RDD进行持久化操作,可以让RDD保存在磁盘或者内存中,以便后续重复使用;但是没有生成新的RDD,也没有触发任务执行,只会标记该RDD分区对应的数据,在第一次触发Action时放入到内存
- cache方法实质上是调用 RDD.persist() 让 Spark 把这个 RDD 缓存下来,persist方法的StorageLevel方法定义存储规则,存储级别包括仅内存( memory only)、仅磁盘( disk only)、堆外内存( off heap)
-
检查点(checkpoint)是将 RDD 保存到磁盘上的操作,以便将来对此 RDD 的引用能直接访问磁盘上的那些中间结果,而不需要从其源头重新计算 RDD。 它与缓存类似,只是它不存储在内存中,只存在磁盘上。
-
什么是有向无环图? 在图论中,如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图(DAG图)。
-
Spark使用DAG来反映各RDD之间的依赖或血缘关系。
2. 与MapReduce对比
MapReduce的流程一般包含为map和reduce两个阶段,map/reduce可在不同分区并行执行多个任务,然而map任务对所负责的分块数据进行map处理后,并写入缓冲区,然后进行分区、排序、聚合等操作,最后将数据输出到磁盘上的不同分区。随后的reduce任务在执行时,必须要将map输出到磁盘的数据通过网络拷贝到本地内存,经过一系列归并、排序计算以后输出回文件系统中。
从上面的过程中可以看出,MapReduce的缺陷:
- 无法在并行计算的各个阶段进行有效的数据共享;
- 启动时间较长,MapReduce过程几乎什么都不做,光启动就需要20-30s;
- MapReduce会频繁地对磁盘进行读写操作,然而这些磁盘I/O并不是必须的;
为什么使用Spark?
与MapReduce不同,Spark的计算流程分为两部分:逻辑处理流程、执行阶段和执行任务划分。
Spark首先根据用户代码中的数据操作语义和顺序,转换成逻辑处理流程(数据计算语义=>输入/输出、中间数据的抽象化表示,RDD;执行过程=>DAG有向无环图),然后Spark对逻辑处理流程进行划分(宽窄依赖),生成物理执行计划(执行阶段Stage+这些任务task)。与MapReduce不同的是,一个SparkJob可以包含多个执行阶段(stage),而且每个执行阶段可以包含多种计算任务,而不是单一地将计算任务区分为map或者reduce。另外,Spark的RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。Spark的cache方法允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
另外,MapReduce部署模式中。会为每一个task启动一个JVM进程运行,而且是在task将要运行时启动JVM,而Spark会预先启动资源容器(Executor JVM),然后当需要执行task时,再在Executor JVM里启动线程运行。
3. Spark的各大组件
Spark集群由集群管理器(Cluster Manager)、工作节点(Worker)、执行器(Executor)、驱动器(Driver)、应用程序(Application)等部分组成
-
** Cluster Manager:** Spark的集群管理器,主要负责对整个集群资源的分配与管理。ClusterManager在YARN部署模式下为ResourceManager;在Standalone部署模式下为Master。Cluster Manager分配的资源属于一级分配,它将各个Worker上的内存、CPU等资源分配给Application,但是并不负责对Executor的资源分配。Standalone部署模式下的Master会直接给Application分配内存、CPU及Executor等资源。
-
Worker:Spark的工作节点。在YARN部署模式下实际由NodeManager替代。Worker节点主要负责以下工作:将自己的内存、CPU等资源通过注册机制告知Cluster Manager;创建Executor;将资源和任务进一步分配给Executor;同步资源信息、Executor状态信息给Cluster Manager等。在Standalone部署模式下,Master将Worker上的内存、CPU及Executor等资源分配给Application后,将命令Worker启动CoarseGrainedExecutorBackend进程
-
**Executor:**主要负责任务的执行及与Worker、Driver的信息同步。
-
Driver: Application的驱动程序,Application通过Driver与Cluster Manager、Executor进行通信。 Driver可以运行在Application中,也可以由Application提交给Cluster Manager并由Cluster Manager安排Worker运行。
-
Application:用户使用Spark提供的API编写的应用程序,Application通过Spark API将进行RDD的转换和DAG的构建,并通过Driver将Application注册到Cluster Manager。 Cluster Manager将会根据Application的资源需求,通过一级分配将Executor、内存、CPU等资源分配给Application。Driver通过二级分配将Executor等资源分配给每一个任务,Application最后通过Driver告诉Executor运行任务。
4. Spark执行流程
- 用户使用SparkContext提供的API编写Driver应用程序。
- 使用SparkContext提交的用户应用程序:首先会通过RpcEnv向集群管理器(Cluster Manager)注册应用(Application)并且告知集群管理器需要的资源数量。集群管理器根据Application的需求,给Application分配Executor资源,并在Worker上启动进程(该进程内部将创建Executor)。
- Executor所在的CoarseGrainedExecutorBackend进程在启动的过程中将通过RpcEnv直接向Driver注册Executor的资源信息,TaskScheduler将保存已经分配给应用的Executor资源的地址、大小等相关信息。
- SparkContext根据各种转换API,构建RDD之间的血缘关系和DAG,RDD构成的DAG将最终提交给DAGScheduler。
- DAGScheduler给提交的DAG创建Job,并根据RDD的依赖性质将DAG划分为不同的Stage。DAGScheduler根据Stage内RDD的Partition数量创建多个Task并批量提交给TaskScheduler。
- TaskScheduler对批量的Task按照FIFO或FAIR调度算法进行调度,然后给Task分配Executor资源。最后将Task发送给Executor由Executor执行。此外,SparkContext还会在RDD转换开始之前使用BlockManager和BroadcastManager将任务的Hadoop配置进行广播。
- 集群管理器(Cluster Manager)会根据应用的需求,给应用分配资源,即将具体任务分配到不同Worker节点上的多个Executor来处理任务的运行。
- SparkConf: 用于管理Spark应用程序的各种配置信息;
- 事件总线: SparkContext内部各组件间使用事件—监听器模式异步调用的实现;
- SparkContext:通常而言,用户开发的Spark应用程序的提交与执行都离不开SparkContex的支持。在正式提交应用程序之前,首先需要初始化SparkContext。SparkContext隐藏了网络通信、分布式部署、消息通信、存储体系、计算引擎、度量系统、文件服务、Web UI等内容,应用程序开发者只需要使用SparkContext提供的API完成功能开发。
5. 执行过程中的一些细节
- Spark中的task是以线程的方式运行的,而mapreduce是以进程的方式运行,每个task之间互相独立,独享进程资源(坏处也是,不方便task之间交互数据),而且mapreduce过程中需要不断启停task,进程的启停和初始化等操作会浪费时间,降低效率。为了数据共享和提高执行效率,Spark以线程方式进行,缺点:线程之间会有资源竞争。
- 划分stage的原因:1)如果每个操作当作一个task,那么效率太低,且容错比较困难。将job划分为stage后,stage在生成的task不会太大,也不会太小,而且是同构的,便于并行执行。2)可以将多个操作放在task中,更方便与串行、流水线式的处理。3)stage方便错误容忍,如果一个stage失效,可以重新运行这个stage,而不用运行整个job。
- RDD的计算是单向操作,属于不可变的类型,计算过程中间数据不可修改,这也是Spark的缺点之一
- RDD的分区个数由父RDD和用户决定,如果个数不指定,一般为父RDD的分区个数最大值
- task的个数通常由每个stage中最后一个RDD的分区个数决定生成task 的个数。
- Spark的操作是粗粒度的,也就是说RDD上的操作是面向分区的,每个分区上的操作是相同的,如果分区一上的数据想要分区二的数据,提高RDD是无法做到的,只能通过聚合操作汇总在一起,这也是Spark的缺点
- task分为ShuffleMapTask和ResultTask
问题:RDD内部的数据是如何进行分区?
分为三种:水平划分,Hash划分和Range划分
- 水平划分: 按record的索引进行划分,但这样做每个RDD中的元素数目和排列顺序不固定,同一个元素可能被划分到不同分区。
- Hash划分:按record的Hash值进行划分,好处在于只需要做到分区的个数,就能将数据确定性地划分到某个分区。该划分方法常用于Shuffle阶段
- Range划分:常用于排序任务,按照元素的大小关系划分不同分区。Range划分通常需要提取划分好数据区域。
task内部数据的存储与计算问题
task对于一些流水线式的操作,会在计算时只需要在内存中保留当前被处理的单个record即可,同时将结果保存至内存中,以提高task的执行效率,并减少内存使用率。
task之间的数据传递和计算问题:
stage之间的依赖关系是ShuffleDependency,下游stage中的每个task会从父RDD的每个分区中获取数据。上游的stage预先将输出数据进行划分,按分区存放,分区个数与下游task个数一致,这个过程叫Shuffle write。按照分区存放完了后,下游task将属于自己分区的数据通过网络传输获取,然后将上游不同分区的数据聚合在一起,这个过程叫Shuffle read
6. Spark的部署模式和作业提交
部署模式
- Local:运行在一台机器上, 通常是练手或者测试环境。
- Standalone:构建一个基于 Mster+Slaves 的资源调度集群, Spark 任务提交给 Master运行。 是 Spark 自身的一个调度系统
- Yarn: Spark 客户端直接连接 Yarn, 不需要额外构建 Spark 集群。 有 yarn-client 和yarn-cluster 两种模式。
yarn-client 和yarn-cluster模式的区别在于Driver进程在哪台机器中启动;
yarn-cluster:
- spark-submit提交请求后,发送消息给Yarn集群中的ResourceManager,请求启动ApplicationMaster
- ResourceManager在某个nodemanager上,启动ApplicationMater
- 在某个NodeManager上启动的ApplicationMater进程相当于Driver进程
- 而后ApplicationMater向ResourceManager申请container,启动executor
- ResourceManager分配一批container用于启动executor,而后ApplicationMater连接其他NM,启动executor
- executor启动后,向ApplicationMater反向注册
yarn-client:
- yarn-client的前两步与yarn-cluster一样
- 与yarn-cluster的主要区别在于,RM在某个nodemanager上,启动ApplicationMater,这里的ApplicationMater只是一个ExecutorLaucher,而不是Driver。Driver在spark-submit提交的本地机器进程中
- 而后ApplicationMater向ResourceManager申请container,启动executor
- ResourceManager分配一批container用于启动executor,而后ApplicationMater连接其他NM,启动executor
- 注意:此时,executor向提交spark作业的机器Driver端反向注册,而不是启动ApplicationMater的机器
yarn-client和yarn-cluster的选择问题:
- yarn-client用于测试,因为,driver运行在本地客户端,负责调度application,会与yarn集群产生超大量的网络通信,从而导致网卡流量激增,。好处在于,直接执行时,本地可以毛到所有的log。方便调试。
- yarn-cluster用于生产环境,以为driver运行在nodemanager,没有网卡流量激增的问题。缺点在于,调试不方便,本地用spark-submit提交后,看不到log,只能通过yarn applicaition -logs application_id这种命令来查看,很麻烦。
Spark On Yarn模式的优点:
1)与其他计算框架共享集群资源(Spark框架与MapReduce框架同时运行,如果不用Yarn进行资源分配,MapReduce分到的内存资源会很少,效率低下);资源按需分配,进而提高集群资源利用等。
2)相较于Spark自带的Standalone模式,Yarn的资源分配更加细致。
3)Application部署简化,例如Spark,Storm等多种框架的应用由客户端提交后,由Yarn负责资源的管理和调度,利用Container作为资源隔离的单位,以它为单位去使用内存,cpu等。
4)Yarn通过队列的方式,管理同时运行在Yarn集群中的多个服务,可根据不同类型的应用程序负载情况,调整对应的资源使用量,实现资源弹性管理。
Spark的作业提交
在提交任务时的几个重要参数
master ——提交模式,local,yarn-cluster…
executor-cores —— 每个 executor 使用的内核数, 默认为 1, 官方建议 2-5 个
num-executors —— 启动 executors 的数量, 默认为 2
executor-memory —— executor 内存大小, 默认 1G
driver-cores —— driver 使用内核数, 默认为 1
driver-memory —— driver 内存大小, 默认 512
1.num-executors 线程数:一般设置在50-100之间,必须设置,不然默认启动的executor非常少,不能充分利用集群资源,运行速度慢
2.executor-memory 线程内存:参考值4g-8g,num-executor乘以executor-memory不能超过队列最大内存,申请的资源最好不要超过最大内存的1/3-1/2
3.executor-cores 线程CPU core数量:core越多,task线程就能快速的分配,参考值2-4,num-executor*executor-cores的1/3-1/2
1.spark-submit spark提交
2.--queue spark 在spark队列
3.--master yarn 在yarn节点提交
4.--deploy-mode client 选择client模型,还是cluster模式;在同一个节点用client,在不同的节点用cluster
5.--executor-memory=4G 线程内存:参考值4g-8g,num-executor乘以executor-memory不能超过队列最大内存,申请的资源最好不要超过最大内存的1/3-1/2
6.--conf spark.dynamicAllocation.enabled=true 是否启动动态资源分配
7.--executor-cores 2 线程CPU core数量:core越多,task线程就能快速的分配,参考值2-4,num-executor*executor-cores的1/3-1/2
8.--conf spark.dynamicAllocation.minExecutors=4 执行器最少数量
9.--conf spark.dynamicAllocation.maxExecutors=10 执行器最大数量
10.--conf spark.dynamicAllocation.initialExecutors=4 若动态分配为true,执行器的初始数量
11.--conf spark.executor.memoryOverhead=2g 堆外内存:处理大数据的时候,这里都会出现问题,导致spark作业反复崩溃,无法运行;此时就去调节这个参数,到至少1G(1024M),甚至说2G、4G)
12.--conf spark.speculation=true 推测执行:在接入kafaka的时候不能使用,需要考虑情景
13.--conf spark.shuffle.service.enabled=true 提升shuffle计算性能
7. 宽窄依赖
Spark通过宽窄依赖解决RDD和分区之间的数据依赖关系:父RDD的一个分区的数据分发给不同的子RDD,则为宽依赖,要进行Shuffle,反之为窄依赖(即使存在分发给子RDD的可能就是宽依赖)
- 窄依赖中,子RDD每个分区都依赖父RDD中的一部分分区,包含一对一依赖(map),区域依赖(union),多对一依赖(join,cogroup),多对多依赖(cartesian)
- 宽依赖表示新生的子RDD中分区依赖父RDD中每个分区的一部分
- 宽窄依赖的区别在于子RDD的各个分区是否完全依赖父RDD的一个或多个分区(也就是区分是不是发生Shuffle)
为什么要划分依赖?
- 明确RDD分区之间的关系,明确Spark从哪获取数据,输出到哪
- 有利于生成物理执行计划
- 如果不划分依赖,一个task中包含很多数据依赖和操作,导致划分出来的task太大,而且会出现重复计算。因此将ShuffleDependency前后的计算逻辑分开,形成不同的阶段和任务,这样就不会出现过大的task。
DAGScheduler的stage划分算法
会从触发action操作的那个rdd开始往前倒推,首先会为最后一个rdd创建一个stage,然后往前倒推的时候,如果发现对某个rdd是宽依赖,那么就会将宽依赖的那个rdd创建一个新的stage,那个rdd就是新的stage的最后一个rdd。然后依次类推,继续往前倒推,根据窄依赖或者宽依赖,进行stage的划分。直到所有的rdd全部遍历完了为止。
7. Shuffle
Shuffle机制即是,运行在不同Stage、不同结点上的task之间进行数据传递的过程。Shuffle机制分为Shuffle Write和Shuffle Read两个阶段,Shuffle Write解决上游Stage输出数据的分区问题,后者解决下游Stage从上游stage获取数据、重新组织、并为后续操作提供数据的问题。
- Shuffle的数据分区问题
stage上游分区个数跟下游stage的task个数一致,也可以自己定义,如groupByKey(numPartitions),numPartitions一般设为CPU个数的1~2倍,如果用户不定义,则和parent RDD分区个数一致 - 数据聚合问题
数据聚合的本质是将Key相同的record放在一起,这个过程可以利用HashMap进行。对于包含聚合函数的RDD操作,通过在线聚合方式进行,将上游每个record加入HashMap,同时进行func()聚合操作,并跟新相应聚合结果。在线聚合可以减少内存消耗,如果将加入HashMap和聚合的操作分两步进行,所有的record都会被存放在HashMap中,占用很大的内存空间。对于其他不包含聚合的ShuffleRDD(groupByKey)则采用在线聚合方式。 - map端(Shuffle Write)的combine问题
如reduceByKey、distinct()等包含聚合函数的算子均会在map端进行一次combine(聚合操作),对上游每个分区里单一task输出的数据进行聚合,利用HashMap,将map中的每一个record聚合成<K,func(list(V))>,然后对Hashmap中每个record进行分区,而后写入磁盘,输出到对应的分区文件中,待下游数据拉取; - Shuffle过程中的排序问题
如sortByKey、sort等算子,需要在Shuffle过程中,将数据按照Key进行排序,这个过程发生在Shuffle Read端。
排序的过程采用先聚合再排序的方法进行,即先基于HashMap进行聚合,然后将map中的record放入线性的数据结构中进行排序。这个方案的优点是:聚合和排序过程独立,灵活性高;缺点是:需要复制数据和引用,空间占用较大。
MapReduce中采用先排序在聚合的方法,先使用线性存储结构Array存储HashMap中的record,然后对Key进行排序,排序后的数据从前到后扫描聚合;这种方法缺点是:需要较大内存空间存储线性结构,而且不能使用在线聚合,效率较差; - 内存问题
在Shuffle过程中数据量过大,导致内存放不下怎么办?由于使用了HashMap对数据进行聚合和规约,在数据量大时候就会发生内存溢出。
Spark的解决方案是:使用内存+磁盘混合存储。现在内存(HashMap)中进行数据聚合,如果内存空间不足,则将内存中的数据spill到磁盘,此时空闲出来的内存继续处理新的数据,不断重复。如果需要聚合的话,内存中所聚合的只是局部聚合结果,我们需要在进行下一步数据操作之前对磁盘和内存中的数据进行全局聚合。为了加速全局聚合,我们需要将数据spill到磁盘上时进行排序,这样全局聚合才能按照顺序读取spill到磁盘上的数据,减少磁盘IO。
(1)Shuffle Write详解
Shuffle Write计算框架的顺序为:map()输出——>数据聚合——>排序——>分区
- map task计算出每个record的partitionId,并同record放入类似HashMap的数据结构中进行聚合(是否聚合可选);
- 再将HashMap中的数据放入Array的数据结构中进行排序,按partitionId/partitionId+Key进行排序(是否排序可选);
- 最后根据partitionId将数据写入不同的数据分区,存放到本地磁盘上。
对于不同算子,Spark对Shuffle Write过程进行了优化
-
不需要map端聚合和排序,BypassMergeSortShuffleWriter
这种情况最为简单,只需要实现分区即可,但具体细节在按分区写入到磁盘过程中,在内存中为每个分区添加了一个buffer,Spark根据partitionId,将record输出到buffer,而后当buffer填满时,溢写到磁盘中。分配buffer的原因:map端输出record速度很快,需要进行缓冲减少磁盘IO。
该模式的优点就是速度快,但在分区过多的情况下,消耗资源,每个分区都需要一个buffer(大小默认为32kb)。
该模式适用于分区较少的情况(默认分区个数为spark.Shuffle.sort.bypassMergeThreshold=200个),如gruopByKey(100) -
不需要聚合,但需要排序,SortShuffleWriter(KeyOrdering=true)
按partitionId+Key对Map进行排序,实现方式为:创建一个Array(PartitionedPairBuffer)存放record,并对PartitionedPairBuffer中的元素的Key按<(PID,K),V>进行存储,最后进行排序,并写入磁盘,通过简历索引来标示每个分区。
- 如果Array存放不下,会先进行扩容,如果还存放不下,就将Array中的record排序后spill到磁盘中,再map输出完以后,进行全局排序。
- 该模式优点:只需要一个Array即可排序,并且支持动态扩容,从小到大的数据排序。同时,输出的数据已经按partitionId排序,因此只需要一个分区文件存储,即可标识不同的分区数据,克服了BypassMergeSortShuffleWriter分区过多的缺点
- 缺点:排序时间长耗时。
- 因此该情况适用于分区个数较多情况下,可以对BypassMergeSortShuffleWriter进行优化,将按PID+Key排序改为,按PID排序,即KeyOrdering=false,Partition>spark.Shuffle.sort.bypassMergeThreshold==200的情况。如groupByKey(300)
值得注意的是:
sortByKey的排序是在Shuffle Read完成的,上述这种方法是为了解决分区个数较多导致Buffer较大的问题。当groupByKey和sortByKey的传参分区大于spark.Shuffle.sort.bypassMergeThreshold时,会选择SortShuffleWriter,小于时选择BypassMergeSortShuffleWriter,通过SortShuffleManager控制两种模式的选择。
SortShuffleManager中的registerShuffle方法,调用SortShuffleWriter.shouldBypassMergeSort判断是否用BypassMergeSort,可以看出,当map端需要Combine和dep.partitioner.numPartitions > bypassMergeThreshold时选择SortShuffleWriter,其他选择BypassMergeSort。
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean =
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine)
false
else
val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
dep.partitioner.numPartitions <= bypassMergeThreshold
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle =
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency))
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
else if (SortShuffleManager.canUseSerializedShuffle(dependency))
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
else
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, numMaps, dependency)
- 需要聚合,SortShuffleWriterWithCombine/mapSideCombine = true
通过HashMap(PartitionedAppendOnlyMap)实现,key为PID+Key,Value则为combine的结果。聚合后采用Array排序,如2.如果不需要对key排序,则只按PID排序。先进行聚合,再排序,最后将排序后的record写入一个分区文件。如果HashMap存不下,则先扩容,最后如果还存储不下,则将HashMap中的record排序后spill到磁盘。
- 优点:仅需要一个HashMap结构就可以实现聚合,且支持扩容和spill到磁盘的功能,支持小规模到大规模的数据,也适用于分区较多的情况。
- 缺点:在内存中进行聚合,内存消耗较大。
- 这里的HashMap是被优化和特殊设计的,名为PartitionedAppendOnlyMap,可以同时支持聚合和排序。
- Api:reduceByKey,aggregateByKey
(2)Shuffle Read详解
Shuffle Read的技术和数据结构和Shuffle Write过程非常类似,而且不需要分区。Shuffle Read阶段包含三个功能:跨结点数据获取、聚合和排序。过程中也采用数据结构ExternalAppendOnlyMap和PartitionedPairBuffer。
Shuffle Read也分是否需要排序、聚合三种情况,与Shuffle Write过程类似。不同点有两个:
- 在map task结束后,从reduce task不断获取record,并存放在buffer中(一个buffer大小为spark.reducer.maxSizeInFlight=48MB),而后的操作均从buffer中获取。
- 再经过聚合和排序后,不输出分区或者落磁盘,直接供其他transformation操作
(3)reduceByKey和groupByKey的区别?
- 功能方面:两者均会根据Key进行分组,但是reduceByKey会在groupByKey的基础上,在ShuffleWrite端进行聚合,聚合函数和Read端一致
- 性能方面:groupByKey会处理所有键值对,随后写入不同分区,而reduceByKey则会先对每个分区进行聚合,减少处理数据量,性能相对groupByKey会更好
- Shuffle机制:两者的Shuffle Read过程一致,但是Shuffle Write机制有所区别。groupByKey机制在分区数量过多情况下,如果不修改手动分区个数的话,会采用BypassMergeSortShuffleWriter模式,ShuffleWrite过程中创建过多的Buffer消耗内存,并且每个分区生成一个磁盘文件,生成过多的小文件,消耗资源。而reduceByKey采用SortShuffleManager,用HashMap(类似)存储数据,并对Key排序,且只写出一个磁盘文件。
(4)与Hadoop Mapreduce的Shuffle区别?
- Hadoop Mapreduce强制按照Key进行排序,一方面可以使用最大堆或者最小堆进行聚合,非常高效。但大多数应用不需要严格按照key进行排序,如groupBykey,增加计算量。而Spark则提供了多种排序方式,如按partitionId排序,按key排序,两者结合排序等方式,更加灵活
- Mapreduce的各个阶段都是固定的,读取/输出/处理数据都是固定的操作,实现起来更简单,但是不能支持在线聚合,数据每一步必须都存放在内存或者磁盘中,再进行后续操作。而Spark采用基于HashMap的在线聚合特性,再record插入HashMap时,自动完成聚合过程。
- Mapreduce的过程中产生大量的临时文件,Mapreduce过程中的分区文件个数=M(map task个数)xN(reduce task个数)。而Spark采用灵活的hash+sort based shuffle,根据不同的分区数量,操作采用最合适的Shuffle方法
(5)lineage
Spark中的task是否需要进行Shuffle read,一个stage中的计算步骤是什么,如何确定计算路径上是否有缓存数据。
- Spark采用lineage的数据朔源方法,这个方法核心就是在每个RDD中记录其上游数据是什么,以及当前RDD是如何通过上游RDD计算得到的
- 如果计算链上有缓存数据,则从缓存数据处切断lineage
8. Shuffle
Spark消费Kafka数据
checkpoint机制
SparkSQL
http://hbasefly.com/tag/spark/
SparkStreaming
-
微批次,每个批次计算的数据比较小
-
准实时,每个批次的产生有时间间隔;Spark会在Driver端定期的生成微批次的job并生成task调度到Executor中,因此task的调度也有延迟
-
流式计算,这个Application会一直运算,除非认为停止或出现异常;
-
什么是DStream?
DStream 本质上是一个以时间为键,RDD 为值的哈希表,保存了按时间顺序产生的 RDD,而每个 RDD 封装了批处理时间间隔内获取到的数据。SS 每次将新产生的 RDD 添加到哈希表中,而对于已经不再需要的 RDD 则会从这个哈希表中删除,所以 DStream 也可以简单地理解为以时间为键的 RDD 的动态序列。
SparkStreaming的反压机制
早期版本 Spark 不支持反向压力,但从 Spark 1.5 版本开始,Spark Streaming 也引入了反向压力功能,这是不是正说明了反向压力功能对流计算系统的必要性!默认情况下 Spark Streaming 的反向压力功能是关闭的。当要使用反向压力功能时,需要将 spark.streaming.backpressure.enabled 设置为 true。
整体而言,Spark 的反向压力借鉴了工业控制中 PID 控制器的思路,其工作原理如下。
-
首先,当 Spark 在处理完每批数据时,统计每批数据的处理结束时间、处理时延、等待时延、处理消息数等信息。
-
然后,根据统计信息估计处理速度,并将这个估计值通知给数据生产者。
-
最后,数据生产者根据估计出的处理速度,动态调整生产速度,最终使得生产速度与处理速度相匹配。
SparkStreaming如何保证数据可靠性?
由于流计算系统是长期运行、数据不断流入的,因此其Spark守护进程(Driver)的可靠性是至关重要的,它决定了Streaming程序能否一直正确地运行下去。
Driver实现HA的解决方案就是将元数据持久化,以便重启后的状态恢复。Driver持久化的元数据包括:Block元数据(Receiver从网络上接收到的数据,组装成Block后产生的Block元数据)和Checkpoint数据(包括配置项、DStream操作、未完成的Batch状态、和生成的RDD数据等)
Driver失败重启后:
- 恢复计算:使用Checkpoint数据重启driver,重新构造上下文并重启接收器。
- 恢复元数据块:恢复Block元数据。
- 恢复未完成的作业:使用恢复出来的元数据,再次产生RDD和对应的job,然后提交到Spark集群执行。
SparkStreaming读取Kafka的两种方式
参考:博客地址
- Receiver-based Approach
基于receiver的方法采用Kafka的高级消费者API,每个executor进程都不断拉取消息(每个worker启动应该receiver进程),并同时保存在executor内存与HDFS上的预写日志(write-ahead log/WAL)。当消息写入WAL后,自动更新ZooKeeper中的offset。
它可以保证at least once语义,但无法保证exactly once语义。原因是虽然引入了WAL来确保消息不会丢失,但有可能会出现消息已写入WAL,但更新comsuer 的offset到zk时失败的情况,此时consumer就会按上一次的offset重新发送消息到kafka重新获取一次已保存到WAL的数据。这种方式还会造成数据冗余(WAL中一份,blockmanager中一份,其中blockmanager可能会做StorageLevel.MEMORY_AND_DISK_SER_2,即内存中一份,磁盘上两份),大大降低了吞吐量和内存磁盘的利用率。
Receiver模式数据接收流程 :
当执行 SS 的 start 方法后,SS 会标记 StreamingContext 为 Active 状态,并且单独起个线程通过ReceiverTracker 将从ReceiverInputDStreams 中获取的 receivers 以并行集合的方式分发到 worker 节点,并运行他们。worker 节点会启动ReceiverSupervisor。接着按如下步骤处理:
- ReceiverSupervisor 会启动对应的 Receiver(这里是 KafkaReceiver)
- KafkaReceiver 会根据配置启动新的线程接受数据,在该线程中调用ReceiverSupervisor.pushSingle 方法填充数据,注意,这里是一条一条填充的。
- ReceiverSupervisor 会调用 BlockGenerator.addData 进行数据填充。
Receiver存在的问题:
worker节点中 exeutor线程里的 receiver接口会一直消费kafka中的数据,如果spark集群定义了每个worker使用的cpu资源不足以消费完了这5秒的数据,那么就会出现数据的丢失,消费不了的那些数据就没了,并且streaming一经启动会一直循环消费拉取资源,如果出现上述问题,分配的cpu不足以消费5秒拉取的数据,那么丢失的数据便会越积越多,这在程序里是严重的bug。
此时则必须要通过Wal方式把日志偏移量存到hdfs上面做备份,防止数据丢失,但是这样会影响性能。
- Direct Approach (No Receivers)
基于direct stream的方法采用Kafka的简单消费者API,大大简化了获取message 的流程。executor不再从Kafka中连续读取消息,也消除了receiver和WAL。还有一个改进就是Kafka分区与RDD分区是一一对应的,允许用户控制topic-partition 的offset,程序变得更加可控。**driver进程只需要每次从Kafka获得批次消息的offset range,然后executor进程根据offset range去读取该批次对应的消息即可。**由于offset在Kafka中能唯一确定一条消息,且在外部只能被Streaming程序本身感知到,因此消除了不一致性,保证了exactly once语义。不过,由于它采用了简单消费者API,我们就需要自己来管理offset。否则一旦程序崩溃,整个流只能从earliest或者latest点恢复,这肯定是不稳妥的。
直连方式就是使用executor直接连接kakfa分区,我们自定义偏移量的使用大小及存储备份方法
- 直连方式从Kafka集群中读取数据,并且在Spark Streaming系统里面维护偏移量相关的信息,实现零数据丢失,保证不重复消费,比createStream更高效;
- 创建的DStream的rdd的partition做到了和Kafka中topic的partition一一对应。
Direct模式相比Receiver模式的优点:
- 降低资源。Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。
- 降低内存。Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。
- 鲁棒性更好。Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败
Spark消费Kafka如何保证精准一次?
- Spark如何接收 kafka 的消息的?
通过KafkaRDD来获取单批次的数据的,KafkaRDD的compute方法返回一个迭代器,这个迭代器封装了kafka partition数据的批量抓取以及负责调用传入的消息处理回调函数并将单条处理结果返回。
其中,spark streaming 的exactly-once 消费机制是通过 KafkaRDD 来保证的,在创建KafkaRDD之前,就已经通过 currentOffset和 估算出的速率,以及每个分区的自定义最大抓取速率,和从partition的leader获取的最大offset,确定分区untilOffset的值,最终fromOffset和untilOffset构成OffsetRange,在KafkaRDD中生成的迭代器中会丢弃掉offset不在该OffsetRange内的数据,最终调用用户传入的消息处理函数,处理数据成用户想要的数据格式。
A batch-oriented interface for consuming from Kafka.Starting and ending offsets are specified in advance,
so that you can control exactly-once semantics.
从kafka 消费的针对批处理的API,开始和结束 的 offset 都提前设定了,所以我们可以控制exactly-once 的语义。
- 这个类是如何将单个partition的消息转换为 RDD单个partition的数据的?
KafkaRDD 的compute 方法 以 partition 作为参数,这个partition是 KafkaRDDPartition 的实例, 包含了分区消息的 offset range,topic, partition 等信息,该方法会返回一个KafkaRDDIterat,该类提供了访问 该分区内kafka 的 数据,内部通过SimpleConsumer 来从leader 节点来批量获取数据,然后再从批量数据中获取我们想要的数据(由offset range来保证)。 - 这个类是如何估算 kafka 消费速率的?
提供了 PIDRateEstimator 类, 该类通过传入batch 处理结束时间,batch 处理条数, 实际处理时间和 batch 调度时间来估算速率的。
广播机制
https://mp.weixin.qq.com/s/urA3S1zdxyGIU-ZyDS1aJA
https://zhuanlan.zhihu.com/p/161963838
Spark调优怎么做
spark调优比较复杂,但是大体可以分为三个方面来进行
1)平台层面的调优:防止不必要的jar包分发,提高数据的本地性,选择高效的存储格式如parquet
2)应用程序层面的调优:过滤操作符的优化降低过多小任务,降低单条记录的资源开销,处理数据倾斜,复用RDD进行缓存,作业并行化执行等等
3)JVM层面的调优:设置合适的资源量,设置合理的JVM,启用高效的序列化方法如kyro,增大off head内存等等
数据本地性问题
Spark中的数据本地性有三种:
1)PROCESS_LOCAL是指读取缓存在本地节点的数据
2)NODE_LOCAL是指读取本地节点硬盘数据
3)ANY是指读取非本地节点数据
通常读取数据PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以PROCESS_LOCAL或NODE_LOCAL方式读取。其中PROCESS_LOCAL还和cache有关,如果RDD经常用的话将该RDD cache到内存中,注意,由于cache是lazy的,所以必须通过一个action的触发,才能真正的将该RDD cache到内存中。
Spark join问题
Spark内存管理机制
- Spark内存消耗得来源?
- 框架本身在处理数据时需要消耗内存,如Shuffle阶段使用的HashMap和Array的数据结构
- 数据缓存,用户将重复使用的数据缓存到内存中避免重复计算
- 用户代码消耗的内存,这部分的实际内存使用量难以被估计
- Spark内存消耗在微观上是指task线程的内存消耗,宏观上指的Executor JVM的内存消耗
- Spark中多个Task以线程方式运行在一个Exector JVM中,task之间存在共享内存和内存竞争问题。
- Shuffle过程中会动态监控HashMap等数据结构的大小,动态调整数据结构长度,并在内存不足时spil到磁盘
静态内存管理模型
Spark1.6之前的版本采用静态内存管理模型,将内存空间划分为三个分区:
- 数据缓存空间:约占60%的内存空间,用于存储广播变量、缓存数据、task的计算结果等
- 框架执行空间:约占20%的内存空间,由于存储Shuffle机制中的中间数据
- 用户代码空间:约占20%的内存空间,用于存储用户代码中的运算结果,Spark产生的内部对象,以及JVM自身的一些内存对象
这种内存管理模型的优点是: 各个分区的职责分明,实现简单
缺点:分区中间存在”硬“界限,难以平衡三者的内存消耗
统一内存管理模型
自从1.6版本开始,为了平衡用户代码、Shuffle机制中的中间数据,以及数据缓存的内存空间需求,Spark提出统一内存管理模型,为三者分配一定的内存配额,并在运行时根据三者的实际内存用量,动态调整配额比例。三者当中,Shuffle的中间数据和缓存数据的内存消耗可以被监控,但用户代码的内存很难被监控和估计。所以统一内存管理模型中,优化思想主要是根据监控的内存使用总量,来动态调节Shuffle机制和缓存数据内存空间的,并为每个内存消耗来源设置一个上下界,其内存配额在上下界范围内动态可调。
-
统一内存管理模型将内存依旧划为三个分区:数据缓存空间、框架执行空间和用户代码空间
-
与静态内存管理模型不同点:统一内存管理模型使用软边界调整分区的占用比例
-
数据缓存空间**(Storage Memory)和框架执行空间(Execution Memory)组成一个大的空间,称为Framework memory**
-
Framework memory 大小固定,为缓存空间和执行空间设置了初始比例,但可以动态调整,如框架执行空间不足时可以借用数据存储空间来存储Shuffle中间数据,同时二者比例也有上下界,避免一方被另一方完全占用。总大小为spark.memory.faction(default 0.6)*(heap-Reserved Memory) 约60%的内存空间,缓存空间和执行空间相互借用内存,均至少要保证二者具有约50%左右的空间
-
用户代码空间被设为固定大小,原因是难以在运行时回去用户真实内存消耗。默认为40%的内存空间
-
框架执行空间不足时,会将Shuffle数据spill到磁盘;
-
数据缓存空间不足时,Spark会进行缓存替换、移除缓存数据等操作。
-
为了限制每个Task的内存使用,为了解决内存共享和竞争问题,也会对每个task的内存使用进行限额,每个task可使用的内存空间被均分,每个task的空间被控制在**[1/2N,1/N]Execution Memory* N是当前Task数目。堆外内存同理
-
系统保留空间(Reserved Memory) 除了上述三组空间外,系统保留内存使用较小的空间存储Spark框架产生的内部
以上是关于Spark核心机制总结的主要内容,如果未能解决你的问题,请参考以下文章