Spark面试突击
Posted 小朱小朱绝不服输
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark面试突击相关的知识,希望对你有一定的参考价值。
大数据方面的面试总结汇总,本篇为Spark的面试总结。
文章目录
- 一、Spark基础
- 二、Spark Core
- 1. 什么是RDD?RDD的五大属性是什么?
- 2. 说一些常用的 RDD 算子?
- 3. RDD 中 reduceBykey 与 groupByKey 哪个性能好,为什么?
- 4. 介绍一下 cogroup RDD 实现原理,你在什么场景下用过这个 RDD?
- 5. RDD 的宽窄依赖了解吗?简述Spark的宽窄依赖?
- 6. RDD的持久化原理
- 7. 你刚才提到了 DAG,能说一下什么是 DAG?
- 8. DAG 中为什么要划分 Stage?如何划分 DAG 的 stage?DAG 划分为 Stage 的算法了解吗?
- 9. Spark 广播变量和累加器介绍一下?区别是啥?
- 10. Checkpoint 检查点机制?
- 11. Checkpoint 和持久化机制的区别?
- 三、Spark Streaming
- 四、Spark SQL
- 五、Spark 数据倾斜
- 六、Spark调优
一、Spark基础
1. 你是怎么理解Spark,它的特点是什么?
Spark 是一个通用分布式内存计算引擎。
Spark是一个基于内存的,用于大规模数据处理(离线计算、实时计算、快速查询(交互式查询))的统一分析引擎。
它内部的组成模块,包含SparkCore,SparkSQL,SparkStreaming,SparkMLlib,SparkGraghx等。
SparkCore
: 核心部分,包含Spark基本功能(任务调度 内存管理 容错机制等)SparkSQL
: Spark 用来操作结构化数据的程序包。通过 Spark SQL,我们可以使用 SQL 操作数据。SparkStreaming
: Spark 提供的对实时数据进行流式计算的组件。提供了用来操作数据流的 API。SparkMLib
:提供常见的机器学习(ML)功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外的支持功能。SparkGraphX
: Spark 中用于图计算的 API,性能良好,拥有丰富的功能和运算符,能在海量数据上自如地运行复杂的图算法。SparkManagers
:集群管理 (HadoopYARN、ApacheMesos、Spark自带的单独调度器)
Spark特点:
运行速度快
:Spark 拥有 DAG 执行引擎,支持在内存中对数据进行迭代计算。如果数据由磁盘读取,速度是 Hadoop MapReduce 的 10 倍以上,如果数据从内存中读取,速度可以高达 100 倍。易用性好
:Spark 不仅支持 Scala 编写应用程序,而且支持 Java 和 Python 等语言进行编写。通用性强
:Spark 生态圈即 BDAS(伯克利数据分析栈)包含了 Spark Core、Spark SQL、Spark Streaming、MLLib 和 GraphX 等组件。兼容性
:Spark 具有很强的适应性,能够读取 HDFS、Cassandra、HBase、S3 和 Techyon 为持久层读写原生数据,能够以 Mesos、YARN 和自身携带的 Standalone 作为资源管理器调度 job,来完成 Spark 应用程序的计算。
2. Spark架构了解吗?
Spark 架构主采用 Master/Worker 主从架构
进行设计,由以下几部分组成:
- 主节点
Master
/ 资源管理 Yarn Application Master - 工作节点
Worker
/ Node Manager - 任务调度器
Driver
- 任务执行器
Executor
3. 简述Spark的作业提交流程
Spark的任务提交方式实际上有两种,分别是YarnClient模式
和YarnCluster模式
。
-
YarnClient 运行模式介绍
-
第一步,Driver端在任务提交的本地机上运行
-
第二步,Driver启动之后就会和ResourceManager通讯,申请启动一个ApplicationMaster
-
第三步,ResourceManager就会分配container容器,在合适的nodemanager上启动ApplicationMaster,负责向ResourceManager申请Executor内存
-
第四步,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程
-
第五步,Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数
-
第六步,之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行。
-
YarnCluster 模式介绍
-
第一步,在YARN Cluster模式下,任务提交后会和ResourceManager通讯申请启动ApplicationMaster
-
第二步,随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver。
-
第三步,Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后在合适的NodeManager上启动Executor进程
-
第四步,Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数,
-
第五步,之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行。
两种作业提交流程的区别:
两种调度模式,最大的区别在于Spark的Driver进程被放在哪里
。
-
Yarn Client
模式的特点是Driver被放在Client系统上,也就是任务提交的系统
。优点:能够很方便的获取Spark代码和SQL脚本中的返回信息。比如在Spark代码中打印日志,或者在Spark Sql脚本中执行了Select等,都可以非常方便的track到。对于问题定位有很好的帮助。
缺点:如果有多个Driver进程在任务提交的系统中运行,会拖慢当前系统的性能,和占用太多当前系统的内存和CPU等资源。且在当前系统因为资源限制中没法启动更多的Driver。也就没法提交更多的spark任务。
-
Yarn Cluster
模式下,Yarn会选中集群中随机一台机器,来启动Driver进程。优点:Client环境提交Spark任务不受到Client本身资源的限制;
缺点:对于日志查看等不太方便。
Driver会和Executors进行通信,这也导致了Yarn-Cluster在提交App之后可以关闭Client,而Yarn-Client不可以。
最后再来说应用场景,Yarn-Cluster适合生产环境,Yarn-Client适合交互和调试。
4. Spark的运行流程?
Spark 运行流程 具体运行流程如下:
- SparkContext 向资源管理器注册并向资源管理器申请运行 Executor
- 资源管理器分配 Executor,然后资源管理器启动 Executor
- Executor 发送心跳至资源管理器
SparkContext 构建 DAG 有向无环图
将 DAG 分解成 Stage(TaskSet)
把 Stage 发送给 TaskScheduler
Executor 向 SparkContext 申请 Task
TaskScheduler 将 Task 发送给 Executor 运行
同时 SparkContext 将应用程序代码发放给 Executor
- Task 在 Executor 上运行,运行完毕释放所有资源
5. Spark与Hadoop对比?Spark为什么比MapRedude快?
注意:
- 尽管 Spark 相对于 Hadoop 而言具有较大优势,但 Spark 并不能完全替代 Hadoop,Spark 主要用于替代 Hadoop 中的 MapReduce 计算模型。存储依然可以使用 HDFS,但是中间结果可以存放在内存中;调度可以使用 Spark内置的,也可以使用更成熟的调度系统 YARN 等。
- 实际上,Spark 已经很好地融入了 Hadoop 生态圈,并成为其中的重要一员,它可以借助于 YARN 实现资源调度管理,借助于HDFS 实现分布式存储。
- 此外,Hadoop 可以使用廉价的、异构的机器来做分布式存储与计算,但是,Spark 对硬件的要求稍高一些,对内存与 CPU有一定的要求。
主要区别:
- Spark把运算的
中间数据(shuffle阶段产生的数据)存放在内存,迭代计算效率更高
,mapreduce的中间结果需要落地,保存到磁盘。 - Spark
容错性高,它通过弹性分布式数据集RDD来实现高效容错
,RDD是一组分布式的存储在节点内存中的只读性的数据集,这些集合是弹性的,某一部分丢失或者出错,可以通过整个数据集的计算流程的血缘关系来实现重建,mapreduce的容错只能重新计算。 - Spark
更通用,提供了transformation和action这两大类的多功能api
,另外还有流式处理sparkstreaming模块、图计算等等,mapreduce只提供了map和reduce两种操作,流计算及其他的模块支持比较缺乏。 - Spark
框架和生态更为复杂
,有RDD,血缘lineage、执行时的有向无环图DAG,stage划分等,很多时候spark作业都需要根据不同业务场景的需要进行调优以达到性能要求,mapreduce框架及其生态相对较为简单,对性能的要求也相对较弱,运行较为稳定,适合长期后台运行。 - Spark计算框架
对内存的利用和运行的并行度比mapreduce高
,Spark运行容器为executor,内部ThreadPool中线程运行一个Task,mapreduce在线程内部运行container,container容器分类为MapTask和ReduceTask.程序运行并行度高。 - Spark
对于executor的优化,在JVM虚拟机的基础上对内存弹性利用
:storage memory与Execution memory的弹性扩容,使得内存利用效率更高。
Spark 与 MapReduce 相比,Spark 运行效率更高。请说明效率更高来源于 Spark 内置的哪些机制?
(也可以参考上面的回答)
- 基于内存计算,减少低效的磁盘交互;
- 高效的调度算法,基于 DAG;
- 容错机制 Linage。
重点部分就是 DAG
和 Lingae
hadoop和spark使用场景?
Hadoop/MapReduce和Spark最适合的都是做离线型的数据分析
,但Hadoop特别适合是单次分析的数据量“很大”的情景
,而Spark则适用于数据量不是很大的情景
。
- 一般情况下,对于中小互联网和企业级的大数据应用而言,单次分析的数量都不会“很大”,因此可以优先考虑使用Spark。
- 业务通常认为Spark更适用于机器学习之类的“迭代式”应用,80GB的压缩数据(解压后超过200GB),10个节点的集群规模,跑类似“sum+group-by”的应用,MapReduce花了5分钟,而spark只需要2分钟。
Spark 解决了 Hadoop 的哪些问题?
-
MR
:抽象层次低,需要使用手工代码来完成程序编写,使用上难以上手;Spark
:Spark 采用 RDD 计算模型,简单容易上手。 -
MR
:只提供 map 和 reduce 两个操作,表达能力欠缺;Spark
:Spark 采用更加丰富的算子模型,包括 map、flatmap、groupbykey、reducebykey 等; -
MR
:一个 job 只能包含 map 和 reduce 两个阶段,复杂的任务需要包含很多个 job,这些 job之间的管理以来需要开发者自己进行管理;Spark
:Spark 中一个 job 可以包含多个转换操作,在调度时可以生成多个 stage,而且如果多个 map 操作的分区不变,是可以放在同一个 task 里面去执行; -
MR
:中间结果存放在 hdfs 中;Spark
:Spark 的中间结果一般存在内存中,只有当内存不够了,才会存入本地磁盘,而不是 hdfs; -
MR
:只有等到所有的 map task 执行完毕后才能执行 reduce task;Spark
:Spark 中分区相同的转换构成流水线在一个 task 中执行,分区不同的需要进行 shuffle 操作,被划分成不同的 stage 需要等待前面的 stage 执行完才能执行。 -
MR
:只适合 batch 批处理,时延高,对于交互式处理和实时处理支持不够;Spark
:Spark streaming 可以将流拆成时间间隔的 batch 进行处理,实时计算。
6. Spark Master HA 主从切换过程不会影响到集群已有作业的运行,为什么?
不会的。因为程序在运行之前,已经申请过资源了,driver 和 Executors 通讯,不需要和 master 进行通讯的。
7. Spark Master 使用 Zookeeper 进行 HA,有哪些源数据保存到 Zookeeper 里面?
spark 通过这个参数 spark.deploy.zookeeper.dir 指定 master 元数据在 zookeeper 中保存的位置,包括 Worker,Driver 和 Application 以及 Executors。standby 节点要从 zk 中,获得元数据信息,恢复集群运行状态,才能对外继续提供服务,作业提交资源申请等,在恢复前是不能接受请求的。
注:Master 切换需要注意 2 点:
- 在 Master 切换的过程中,所有的已经在运行的程序皆正常运行!因为 Spark Application 在运行前就已经通过 Cluster Manager 获得了计算资源,所以在运行时 Job 本身的 调度和处理和 Master 是没有任何关系。
- 在 Master 的切换过程中唯一的影响是不能提交新的 Job:一方面不能够提交新的应用程序给集群, 因为只有 Active Master 才能接受新的程序的提交请求;另外一方面,已经运行的程序中也不能够因 Action 操作触发新的 Job 的提交请求。
8. Spark 主备切换机制原理知道吗?
Master 实际上可以配置两个,Spark 原生的 standalone 模式是支持 Master 主备切换的。当 Active Master 节点挂掉以后,我们可以将 Standby Master 切换为 Active Master。
Spark Master 主备切换可以基于两种机制,
- 一种是基于
文件系统
的,基于文件系统的主备切换机制,需要在 Active Master 挂掉之后手动切换到 Standby Master 上; - 一种是基于
ZooKeeper
的,而基于 Zookeeper 的主备切换机制,可以实现自动切换 Master。
9. Spark 与 MapReduce 的 Shuffle 的区别?
相同点:
- 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是ResultTask)
不同点:
- MapReduce 默认是排序的,spark 默认不排序,除非使用 sortByKey 算子。
- MapReduce 可以划分成split,map()、spill、merge、shuffle、sort、reduce()等阶段,spark没有明显的阶段划分,只有不同的 stage 和算子操作。
- MR 落盘,Spark 不落盘,spark 可以解决 mr 落盘导致效率低下的问题。
10. spark如何保证宕机迅速恢复?
- 适当增加spark standby master
- 编写shell脚本,定期检测master状态,出现宕机后对master进行重启操作
二、Spark Core
1. 什么是RDD?RDD的五大属性是什么?
概念:RDD(Resilient Distributed Dataset)
叫做弹性分布式数据集
,是 Spark 中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。
作用:RDD提供了一个抽象的数据模型,将具体的应用逻辑表达为一系列转换操作(函数)。另外不同RDD之间的转换操作之间还可以形成依赖关系,进而实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘IO和序列化开销,并且还提供了更多的API(map/reduec/filter/groupBy…)。
属性:RDD 是一个数据集的表示,不仅表示了数据集,还表示了这个数据集从哪来,如何计算,主要属性包括:
- 分区列表
- 计算函数
- 依赖关系
- 分区函数(默认是 hash)
- 最佳位置
分区列表、分区函数、最佳位置,这三个属性其实说的就是数据集在哪,在哪计算更合适,如何分区;
计算函数、依赖关系,这两个属性其实说的是数据集怎么来的。
2. 说一些常用的 RDD 算子?
Spark处理时分为两种操作,一种转换 transformation 操作、一种动作 action 操作
transformation
操作常用算子如下:
Map、MapPartitions、FlatMap、Filter、distinct、sortBy、union、reduceByKey、groupByKey、sortByKey、join
action
操作常用算子如下:
reduce、collect、count、save、take、aggregate、countByKey等。
有哪些会引起Shuffle过程的Spark算子呢?
- reduceByKey
- groupByKey
- …ByKey
3. RDD 中 reduceBykey 与 groupByKey 哪个性能好,为什么?
reduceByKey
:reduceByKey 会在结果发送至 reducer 之前会对每个 mapper 在 本地进行 merge,有点类似于在 MapReduce 中的 combiner。这样做的好处在于, 在 map 端进行一次 reduce 之后,数据量会大幅度减小,从而减小传输,保证 reduce 端能够更快的进行结果计算。groupByKey
:groupByKey 会对每一个 RDD 中的 value 值进行聚合形成一个序列(Iterator),此操作发生在 reduce 端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成 OutOfMemoryError。
所以在进行大量数据的 reduce 操作
时候建议使用 reduceByKey。不仅可以提高 速度,还可以防止使用 groupByKey 造成的内存溢出问题。
4. 介绍一下 cogroup RDD 实现原理,你在什么场景下用过这个 RDD?
cogroup
:对多个(2~4)RDD 中的 KV 元素,每个 RDD 中相同 key 中的元素分 别聚合成一个集合。- 与
reduceByKey
不同的是:reduceByKey 针对一个 RDD 中相同的 key 进行合并。 而 cogroup 针对多个 RDD 中相同的 key 的元素进行合并。 cogroup 的函数实现
:这个实现根据要进行合并的两个 RDD 操作,生成一个 CoGroupedRDD 的实例,这个 RDD 的返回结果是把相同的 key 中两个 RDD 分别 进行合并操作,最后返回的 RDD 的 value 是一个 Pair 的实例,这个实例包含 两个 Iterable 的值,第一个值表示的是 RDD1 中相同 KEY 的值,第二个值表 示的是 RDD2 中相同 key 的值。- 由于做 cogroup 的操作,需要通过 partitioner 进行重新分区的操作,因此, 执行这个流程时,需要执行一次 shuffle 的操作(如果要进行合并的两个 RDD 的都已经是 shuffle 后的 rdd,同时他们对应的 partitioner 相同时,就不需 要执行 shuffle)。
场景
:表关联查询或者处理重复的 key。
5. RDD 的宽窄依赖了解吗?简述Spark的宽窄依赖?
一个作业从开始到结束的计算过程中产生了多个 RDD,RDD 之间是彼此相互依赖的,这种父子依赖的关系称之为“血统
”。
- 如果父 RDD 的每个分区最多只能被子 RDD 的一个分区使用,称之为
窄依赖(一对一)
。 - 若一个父 RDD 的每个分区可以被子 RDD 的多个分区使用,称之为
宽依赖(一对多)
。
为什么要设计宽窄依赖?
- 对于
窄依赖
: 窄依赖的多个分区可以并行计算; 窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就 可以了。 - 对于
宽依赖
: 划分 Stage(阶段)的依据:对于宽依赖,必须等到上一阶段计算完成才能 计算下一阶段。
6. RDD的持久化原理
spark非常重要的一个功能特性就是可以将RDD持久化在内存中
。
调用cache()和persist()方法即可
。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。- cache()和persist()的
区别
在于,cache()是persist()的一种简化方式,cache()的底层就是调用persist()的无参版本persist(MEMORY_ONLY),将数据持久化到内存中。 - 如果需要从内存中清除缓存,可以使用
unpersist()
方法。RDD持久化是可以手动选择不同的策略的。在调用persist()时传入对应的StorageLevel即可。
7. 你刚才提到了 DAG,能说一下什么是 DAG?
DAG(Directed Acyclic Graph 有向无环图)
指的是数据转换执行的过程,有方向,无闭环(其实就是 RDD 执行的流程);
原始的 RDD 通过一系列的转换操作就形成了 DAG 有向无环图,任务执行时,可以按照 DAG 的描述,执行真正的计算(数据被操作的一个过程)。
DAG 的边界
- 开始:通过 SparkContext 创建的 RDD;
- 结束:触发 Action,一旦触发 Action 就形成了一个完整的 DAG。
8. DAG 中为什么要划分 Stage?如何划分 DAG 的 stage?DAG 划分为 Stage 的算法了解吗?
为什么划分?
并行计算。
一个复杂的业务逻辑如果有 shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照 shuffle 进行划分(也就是按照宽依赖就行划分),就可以将一个DAG 划分成多 个 Stage/阶段,在同一个 Stage 中,会有多个算子操作,可以形成一个 pipeline流水线,流水线内的多个平行的分区可以并行执行。
如何划分?
- 对于
窄依赖
,partition 的转换处理在 stage 中完成计算,不划分(将窄依赖尽 量放在在同一个 stage 中,可以实现流水线计算)。 - 对于
宽依赖
,由于有 shuffle 的存在,只能在父 RDD 处理完成后,才能开始接 下来的计算,也就是说需要要划分 stage。
划分算法?
回溯算法
:从后往前回溯/反向解析,遇到窄依赖加入本 Stage,遇见宽依赖进行 Stage 切分。Spark 内核会从触发 Action 操作的那个 RDD 开始从后往前推,首先会为最后 一个 RDD 创建一个 Stage,然后继续倒推,如果发现对某个 RDD 是宽依赖,那 么就会将宽依赖的那个 RDD 创建一个新的 Stage,那个 RDD 就是新的 Stage 的最后一个 RDD。 然后依次类推,继续倒推,根据窄依赖或者宽依赖进行 Stage 的划分,直到所有的 RDD 全部遍历完成为止。
9. Spark 广播变量和累加器介绍一下?区别是啥?
在默认情况下,当 Spark 在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。
为了满足这种需求,Spark 提供了两种类型的变量:
累加器 accumulators
:累加器支持在所有不同节点之间进行累加计算(比如计数或者求和)。广播变量 broadcast variables
:广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。
10. Checkpoint 检查点机制?
应用场景
:当 spark 应用程序特别复杂,从初始的 RDD 开始到最后整个应用程序完成有很多的步骤,而且整个应用运行时间特别长,这种情况下就比较适合使 用 checkpoint 功能。原因
:对于特别复杂的 Spark 应用,会出现某个反复使用的 RDD,即使之前持久化过但由于节点的故障导致数据丢失了,没有容错机制,所以需要重新计算一 次数据。- Checkpoint 首先会调用 SparkContext 的 setCheckPointDIR()方法,设置一个容错的文件系统的目录,比如说 HDFS;然后对 RDD 调用 checkpoint()方法。 之后在 RDD 所处的 job 运行结束之后,会启动一个单独的 job,来将 checkpoint 过的 RDD 数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。
- 检查点机制是我们在 spark streaming 中用来保障容错性的主要机制,它可以 使 spark streaming阶段性的把应用数据存储到诸如 HDFS 等可靠存储系统中, 以供恢复时使用。
具体来说基于以下两个目的服务:
- 控制发生失败时需要重算的状态数。Spark streaming 可以通过转化图的 谱系图来重算状态,检查点机制则可以控制需要在转化图中回溯多远。
- 提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了,你可以重 启驱动器程序并让驱动器程序从检查点恢复,这样 spark streaming 就 可以读取之前运行的程序处理数据的进度,并从那里继续。
11. Checkpoint 和持久化机制的区别?
- 最主要的区别在于持久化只是将数据保存在 BlockManager 中,但是 RDD 的 lineage(血缘关系,依赖关系)是不变的。但是checkpoint 执行完之后,rdd 已 经没有之前所谓的依赖 rdd 了,而只有一个强行为其设置的 checkpointRDD,checkpoint 之后 rdd 的 lineage 就改变了。
- 持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。 但是 checkpoint的数据通常是保存在高可用的文件系统中,比如 HDFS 中,所 以数据丢失可能性比较低
三、Spark Streaming
1. 什么是 Spark Streaming?
Spark Streaming 是一个基于 Spark Core 之上的实时计算框架,可以从很多数据源消费数据并对数据进行实时的处理,具有高吞吐量和容错能力强等特点。
2. Spark Streaming 以及基本工作原理?
- Spark streaming 是 spark core API 的一种扩展,可以用于进行大规模、高吞 吐量、容错的实时数据流的处理。
- 它支持从多种数据源读取数据,比如 Kafka、Flume、Twitter 和 TCP Socket, 并且能够使用算子比如 map、reduce、join 和 window 等来处理数据,处理后 的数据可以保存到文件系统、数据库等存储中。
- Spark streaming 内部的基本工作原理是:接受实时输入数据流,然后将数据拆 分成 batch,比如每收集一秒的数据封装成一个 batch,然后将每个 batch 交 给 spark 的计算引擎进行处理,最后会生产处一个结果数据流,其中的数据也 是一个一个的 batch 组成的。
3. Spark Streaming 如何执行流式计算的?
Spark Streaming 中的流式计算其实并不是真正的流计算,而是微批计算
。Spark Streaming 的 RDD 实际是一组小批次的 RDD 集合,是微批(Micro-Batch)的模型
,以批为核心。
Spark Streaming 在流计算实际上是分解成一段一段较小的批处理数据(Discretized Stream),其中批处理引擎使用 Spark Core,每一段数据都会被转换成弹性分布式数据集 RDD,然后 Spark Streaming 将对 DStream 的转换操作变为 Spark 对 RDD 的转换操作,并将转换的中间结果存入内存中,整个流式计算依据业务的需要可以对中间数据进行叠加。
4. DStream 以及基本工作原理?
- DStream 是 spark streaming 提供的一种高级抽象,代表了一个持续不断的数据流。
- DStream 可以通过输入数据源来创建,比如 Kafka、flume 等,也可以通过其他 DStream 的高阶函数来创建,比如 map、reduce、join 和 window 等。
- DStream 内部其实不断产生 RDD,每个 RDD 包含了一个时间段的数据。
- Spark streaming 一定是有一个输入的 DStream 接收数据,按照时间划分成一个一个的 batch,并转化为一个RDD,RDD 的数据是分散在各个子节点的 partition 中。
5. Spark Streaming 整合 Kafka 的两种模式?
receiver 方式
:
- 将数据拉取到 executor 中做操作,若数据量大,内存 存储不下,可以通过 WAL,设置了本地存储,保证数据不丢失,然后使用 Kafka 高级 API 通过 zk 来维护偏移量,保证消费数据。receiver 消费 的数据偏移量是在 zk 获取的,此方式效率低,容易出现数据丢失。
receiver 方式的容错性
:在默认的配置下,这种方式可能会因为底层的 失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用 Spark Streaming 的预写日志机制(Write Ahead Log,WAL)。该机制会 同步地将接收到的 Kafka 数据写入分布式文件系统(比如 HDFS)上的预 写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数 据进行恢复。- Kafka 中的 topic 的 partition,与 Spark 中的 RDD 的 partition 是 没有关系的。在 1、KafkaUtils.createStream()中,提高 partition 的 数量,只会增加 Receiver 方式中读取 partition 的线程的数量。不会 增加 Spark 处理数据的并行度。 可以创建多个 Kafka 输入 DStream, 使用不同的 consumer group 和 topic,来通过多个 receiver 并行接收 数据。
基于 Direct 方式
:
- 使用 Kafka 底层 Api,其消费者直接连接 kafka 的分 区上,因为 createDirectStream 创建的 DirectKafkaInputDStream 每 个 batch 所对应的 RDD 的分区与 kafka 分区一一对应,但是需要自己 维护偏移量,即用即取,不会给内存造成太大的压力,效率高。
优点
:简化并行读取:如果要读取多个 partition,不需要创建多个输入 DStream 然后对它们进行 union 操作。Spark 会创建跟 Kafka partition 一样多的 RDD partition,并且会并行从 Kafka 中读取数据。 所以在 Kafka partition 和 RDD partition 之间,有一个一对一的映射 关系。高性能
:如果要保证零数据丢失,在基于 receiver 的方式中,需要开启 WAL 机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka 自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到 WAL 中。而基于 direct 的方式,不依赖 Receiver,不需要开启 WAL 机 制,只要 Kafka 中作了数据的复制,那么就可以通过 Kafka 的副本进行 恢复。
receiver 与和 direct 的比较
:
- 基于 receiver 的方式,是使用 Kafka 的高阶 API 来在 ZooKeeper 中 保存消费过的 offset 的。这是消费 Kafka 数据的传统方式。这种方式 配合着 WAL 机制可以保证数据零丢失的高可靠性,但是却无法保证数据 被处理一次且仅一次,可能会处理两次。因为 Spark 和 ZooKeeper 之间 可能是不同步的。
- 基于 direct 的方式,使用 Kafka 的低阶 API,Spark Streaming 自己 就负责追踪消费的 offset,并保存在 checkpoint 中。Spark 自己一定 是同步的,因此可以保证数据是消费一次且仅消费一次。
- Receiver 方式是通过 zookeeper 来连接 kafka 队列,Direct 方式是直 接连接到 kafka 的节点上获取数据。
四、Spark SQL
1. Spark SQL 执行的流程?
这里简单介绍下总体流程:
- parser:基于 antlr 框架对 sql 解析,生成抽象语法树。
- 变量替换:通过正则表达式找出符合规则的字符串,替换成系统缓存环境的变量。
- parser:将 antlr 的 tree 转成 spark catalyst 的 LogicPlan,也就是未解析的逻辑计划;详细参考AstBuild, ParseDriver。
- analyzer:通过分析器,结合 catalog,把 logical plan 和实际的数据绑定起来,将未解析的逻辑计划生成逻辑计划;详细参考QureyExecution。
- 缓存替换:通过 CacheManager,替换有相同结果的 logical plan(逻辑计划)
- logical plan 优化,基于规则的优化;优化规则参考 Optimizer,优化执行器 RuleExecutor
- 生成 spark plan,也就是物理计划;参考QueryPlanner和SparkStrategies
- spark plan 准备阶段
- 构造 RDD 执行,涉及 spark 的 wholeStageCodegenExec 机制,基于 janino 框架生成 java代码并编译
2. Spark SQL 是如何将数据写到 Hive 表的?
- 方式一:是利用 Spark RDD 的 API 将数据写入 hdfs 形成 hdfs 文件,之后再将 hdfs 文件和 hive表做加载映射。
- 方式二:利用 Spark SQL 将获取的数据 RDD 转换成 DataFrame,再将 DataFrame 写成缓存表,最后利用Spark SQL 直接插入 hive 表中。而对于利用 Spark SQL 写 hive 表官方有两种常见的 API,第一种是利用JavaBean 做映射,第二种是利用 StructType 创建 Schema 做映射。
3. 你用sparksql处理的时候, 处理过程中用的dataframe还是直接写的sql?为什么?
这个问题的宗旨是问你spark sql 中dataframe和sql的区别,从执行原理、操作方便程度和自定义程度
来分析 这个问题。
- DataFrame = RDD - 泛型 + Schema + SQL + 优化
- DataSet = DataFrame + 泛型
- DataSet = RDD + Schema + SQL + 优化
五、Spark 数据倾斜
1. 对于 Spark 中的数据倾斜问题你有什么好的方案?
- 前提是定位数据倾斜,是
OOM
了,还是任务执行缓慢
,看日志,看 WebUI - 解决方法,有多个方面:
- 避免不必要的 shuffle,如使用广播小表的方式,将 reduce-side-join 提升为 map-side-join
- 分拆发生数据倾斜的记录,分成几个部分进行,然后合并 join 后的结果
- 改变并行度,可能并行度太少了,导致个别 task 数据压力大
- 两阶段聚合,先局部聚合,再全局聚合
- 自定义 paritioner,分散 key 的分布,使其更加均匀
2. Spark 中的 OOM 问题?
-
1.
map 类型的算子执行中内存溢出如 flatMap,mapPatitions
- 原因:map 端过程产生大量对象导致内存溢出:这种溢出的原因是在单个 map 中产生了大量的对象导致的针对这种问题。
- 解决方案:
- 增加堆内内存。
- 在不增加内存的情况下,可以减少每个 Task 处理数据量,使每个 Task 产生大量的对象时,Executor的内存也能够装得下。具体做法可以在会产生大量对象的 map 操作之前调用 repartition 方法,分区成更小的块传入 map。
-
2.
shuffle 后内存溢出如 join,reduceByKey,repartition
。- shuffle 内存溢出的情况可以说都是 shuffle 后,单个文件过大导致的。在 shuffle 的使用,需要传入一个partitioner,大部分 Spark 中的 shuffle 操作,默认的 partitioner 都是HashPatitioner,默认值是父 RDD 中最大的分区数.这个参数 spark.default.parallelism 只对HashPartitioner 有效.如果是别的 partitioner 导致的 shuffle 内存溢出就需要重写partitioner 代码了.
-
3.
driver 内存溢出
-
用户在 Dirver 端口生成大对象,比如创建了一个大的集合数据结构。解决方案:将大对象转换成 Executor 端加载,比如调用sc.textfile 或者评估大对象占用的内存,增加 dirver 端的内存。
-
从 Executor 端收集数据(collect)回 Dirver 端,建议将 driver 端对 collect 回来的数据所作的操作,转换成 executor 端 rdd 操作。
-
3. Spark 中数据的位置是被谁管理的?
每个数据分片都对应具体物理位置,数据的位置是被 blockManager
管理,无论 数据是在磁盘,内存还是 tacyan,都是由 blockManager 管理。
4. Spark 程序执行,有时候默认为什么会产生很多 task,怎么修改默认 task 执行个数?
- 输入数据有很多 task,尤其是有很多小文件的时候,有多少个输入 block 就会有多少个 task 启动;
- spark 中有 partition 的概念,每个 partition 都会对应一个 task, task 越多,在处理大规模数据的时候,就会越有效率。不过 task 并不 是越多越好,如果平时测试,或者数据量没有那么大,则没有必要 task 数 量太多。
- 参数可以通过
spark_home/conf/spark-default.conf
配置文件设置:
针对 spark sql 的 tas量:spark.sql.shuffle.partitions=50
非 spark sql 程序设置生效:spark.default.parallelism=10
六、Spark调优
1. 能介绍下你所知道和使用过的Spark调优吗?
资源参数调优
- num-executors:设置Spark作业总共要用多少个Executor进程来执行
- executor-memory:设置每个Executor进程的内存
- executor-cores:设置每个Executor进程的CPU core数量
- driver-memory:设置Driver进程的内存
- spark.default.parallelism:设置每个stage的默认task数量
…
开发调优
-
避免创建重复的RDD
-
尽可能复用同一个RDD
-
对多次使用的RDD进行持久化
-
尽量避免使用shuffle类算子
-
使用map-side预聚合的shuffle操作
-
使用高性能的算子
①使用reduceByKey/aggregateByKey替代groupByKey
②使用mapPartitions替代普通map
③使用foreachPartitions替代foreach
④使用filter之后进行coalesce操作
⑤使用repartitionAndSortWithinPartitions替代repartition与sort类操作
参考:
Spark面试干货总结!(8千字长文、27个知识点、21张图)
以上是关于Spark面试突击的主要内容,如果未能解决你的问题,请参考以下文章