Spark相关总结
Posted 马晟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark相关总结相关的知识,希望对你有一定的参考价值。
1.大数据处理框架
1.1 四层结构
大数据处理框架一般可以大致分为四层结构:
- 用户层
- 分布式数据并行处理层
- 资源管理与任务调度层
- 物理执行层
下面分别就四层进行详细解释。
1.1.1 用户层
主要包括:
- 输入数据:考虑数据如何高效读取(减少磁盘I/O)、批式和流式不同的读取方式等
- 用户代码
- 配置参数:分为资源相关的配置参数和数据相关的配置参数
1.1.2 分布式数据并行处理层
将用户提交的任务转换成较小的计算任务,然后通过调度层实现并行执行。
Spark中应用转换过程包含两个阶段:
- 逻辑处理流程:构建DAG,其中的节点是RDD
- 执行阶段和执行任务划分:对逻辑处理流程进行划分,生成物理执行计划,包含多个stage(根据RDD之间的依赖关系划分),每个stage包含多个task(一般与分区数一致)
1.1.3 资源管理和任务调度层
大数据框架一般都是Master-Slave结构,主节点负责接收用户的应用提交,处理请求,管理整个应用的生命周期。从节点主要负责任务的执行,即具体的数据处理,同时执行过程中向主节点汇报任务的执行状态。
典型的任务调度器包括FIFO(包括应用调度器和任务调度器,分别决定多个应用和多个任务的执行顺序)调取器和Fair调度器。
1.1.4 物理执行层
负责启动task,执行每个task的数据处理步骤。在MR中,每个task对应一个进程,而在Spark中,每个task对应JVM中的一个线程,task共享JVM内存空间。
2.Spark基本架构
2.1 安装部署
Spark的部署方式主要有以下几种:
- Standalone
- Mesos
- YARN
- Kubernetes
2.2 系统架构
- Master节点:常驻Master进程,负责管理应用和任务,收集任务运行信息,监控Worker节点存活状态等
- Worker节点:常驻Worker进程,负责执行任务,与Master节点通信,每个Worker进程上存在一个或多个ExecutorRunner对象,每个ExecutorRunner对象管理一个Executor,Executor持有一个线程池,每个线程执行一个task
- Appcation:一个可运行的Spark程序
- Driver:运行Spark应用中main()函数并且创建SparkContext的进程,独立于Master进程
- Executor:Spark计算资源的一个单位,是一个JVM进程,Spark以Executor为单位占用集群资源,然后将具体的计算任务分配给Executor执行(Worker进程只负责启停和观察Executor的执行情况)
- task:Spark的计算任务,Driver运行Spark应用的main()函数时,将应用拆分为多个计算任务,分配给多个Executor执行。task是Spark中最小的计算单位,以线程方式运行在Executor进程中,Executor的内存空间由多个task共享
3.Spark逻辑处理流程
Spark在运行应用程序前,首先需要将应用程序转化为逻辑处理流程(Logical Plan),主要包含四部分:
- 数据源:包括本地文件系统和分布式文件系统,还可以是内存数据结构、网络流等
- 数据模型:Spark将输入/输出、中间数据抽象为统一的数据模型RDD,其与普通的数据结构主要区别有两点,一是RDD只是一个逻辑概念,在内存中并不会为其分配存储空间,除非该RDD需要被缓存,二是RDD可以包含多个数据分区,不同数据分区可以由不同的task在不同的节点进行处理
- 数据操作:Spark中将数据操作主要分为transformation()和action()操作,action操作触发Spark提交job真正执行任务
- 计算结果处理:分为两种方式,一种是直接将计算结果存放到分布式文件系统中,这种方式一般不需要在Driver端进行集中计算,另一种方式是需要在Driver端进行集中计算
3.1 Spark逻辑处理流程生成
Spark需要有一套通用的方法来将应用程序转化为确定性的逻辑处理流程,需要考虑三个问题。
3.1.1 如何产生RDD以及产生什么样的RDD
需要多种类型的RDD来表示不同数据类型、不同计算逻辑以及不同数据以来的RDD,Spark实际产生的RDD类型和个数与transformation()的计算逻辑有关
3.1.2 如何建立RDD之间的数据依赖关系
包含两方面:RDD之间的依赖关系和RDD自身分区之间的依赖关系
主要包括三个问题:
- RDD之间的依赖关系如何建立:对于一元操作,子RDD只依赖父RDD,二元操作,子RDD同时依赖多个父RDD,二元以上的类比
- 新生成的RDD应该包含多少分区:分区数由用户和父RDD共同决定,用户可以指定分区个数,如果不指定,取父RDD的分区个数最大值
- 分区之间的依赖关系:分为窄依赖和宽依赖
窄依赖
- 一对一:map()、filter()
- 范围依赖:union()
- 多对一:join()、cogroup()
- 多对多:cartesian()
宽依赖
表示新生成的子RDD中的分区依赖父RDD中的每个分区的一部分,和窄依赖的区别在于:子RDD的各个分区是否完全依赖父RDD的一个或多个分区,如果父RDD中的一个分区中的数据全部流入子RDD的一个或者多个分区,则是窄依赖,如果一部分流入RDD的一个分区,一部分流入另一个分区,则是宽依赖
3.1.3 如何计算RDD中的数据
确定数据依赖关系之后,使用transformation(func)处理每个分区的输入数据,将生成的数据再推送到子RDD中对应的分区即可
3.2 常用的transformation算子
- map:对每条记录进行处理,输出一条新的record
- mapValues:对k,v类型的record,对其value进行处理,输出新的record
- filter:对record进行过滤,保留判断结果为true的record
- filterByRange:保留lower和upper之间的record(判断key)
- flatMap:对每个元素进行操作,得到新元素,然后将所有元素组合
- flatMapValues:只针对value进行操作
- sample:对数据进行抽样
- mapPartitions:对每个分区进行操作,输出一组新的数据(该操作可以用来实现数据库操作)
- mapPartitionsWithIndex:分区中的数据带有索引,表示record属于哪个分区
- partitionBy:使用新的分区器进行重新分区
- groupByKey:将<k,v>结构按照key聚合在一起,形成<k,CompactBuffer(v)>结构(会进行shuffle操作)
- reduceByKey:与groupByKey类似,包括两步聚合,首先对RDD中每个分区的数据进行一个本地化的combine,然后执行reduce操作,不形成新的RDD,然后生成新的ShuffledRDD,将RDD1中不同分区且具有相同key的数据聚合在一起,再进行一次reduce操作,该算子只能对record一个一个连续处理,中间计算结构必须和value是同一类型,效率相比groupByKey更高
- aggregateByKey:由于reduceByKey算子灵活性较低,所以需要定义一个通用的聚合算子
- combineByKey:该算子的createCombiner是一个初始化函数,相比aggregateByKey包含的zeroValue是一个值,功能更加强大
- foldByKey:简化的aggregateByKey,seqOp和combineOp共用一个func,功能介于aggregateByKey和reduceByKey之间
- coGroup:将多个RDD中具有相同key的value聚合在一起
- join:将两个RDD中的数据关联在一起
- cartesian:计算两个RDD的笛卡尔积
- sortByKey:对RDD<k,v>中record按照key进行排序(如何对value进行排序?二次排序或者先使用groupByKey将数据聚合,再使用mapValues对value进行排序)
- coalesce:改变RDD的分区数,可以选择true或false决定是否进行shuffle
- repartition:相当于参数为true的coalesce
- repartitionAndSortWithinPartitions:类似于repartition,可以使用各种分区器,对RDD2中的每个分区按照key进行排序,比repartiiton+sortByKey效率高
- intersection:求两个RDD的交集
- distinct:去重
- union:合并两个RDD中的元素
- zip:将两个RDD中的元素一一对应连接成<k,v>,要求两个RDD分区数相同,每个分区包含元素个数相等
- zipPartitions:将两个RDD中的分区按照一一对应连接,要求分区个数相同,但不要求每个分区包含的元素个数相同
- zipWithIndex:对RDD中的数据进行编号,从0开始按序递增
- zipWithUniqueId:对RDD中的数据进行编号,round-robin方式
- subtractByKey:计算key在RDD1中但是不在RDD2中的record
- subtract:计算在RDD1中但是不在RDD2中的record,适用面更广,可以针对非<k,v>类型的RDD
- sortBy:基于func计算结果对RDD中的record进行排序
- glom:将RDD中每个分区的record合并到一个list中
3.3 常用的action算子
- count:计算RDD中的record个数
- countByKey:计算RDD中每个key出现的次数,RDD需要是<k,v>类型,返回Map
- countByValue:计算RDD中每个record出现的次数,返回Map
- collect:将RDD中的record收集到Driver端
- collectAsMap:将RDD中的<k,v>record收集到Driver端,得到<k,v>Map
- foreach:将RDD中的每个record按照func进行处理
- foreachPartition:将RDD中的每个分区中的数据按照func进行处理
- fold:语义与foldByKey一样,区别是foldByKey生成一个新的RDD,而fold直接计算出结果
- reduce:语义与reduceByKey一样,区别同上
- aggregate:语义与aggregateByKey一样,区别同上,需要这几个操作的原因是,我们需要全局聚合,而前面的操作只能对每个分区,以及跨分区且具有相同key的record进行聚合,不能对所有record进行全局聚合,存在的问题是Driver单点merge,存在效率和空间限制问题,优化为treeAggregate和treeReduce
- treeAggregate:使用树形聚合方法,减轻Driver端聚合压力,采用flodByKey来实现非根节点的聚合,使用fold来实现根节点的聚合
- treeReduce:类似
- reduceByKeyLocality:和reduceByKey不同在于首先在本地进行局部reduce,然后将数据汇总到Driver端进行全局Reduce,返回结果保存在HashMap中而不是RDD中
- take:取RDD中前n个record
- first:取RDD中第一个record
- takeOrdered:取RDD中最小的n个record
- top:取RDD中最大的n个record
- max:计算RDD中record的最大值
- min:计算RDD中record的最小值
- isEmpty:判断RDD是否为空
- lookup:找出RDD中包含特定key的value,组成list
- saveAsTextFile:将RDD保存为文本文件
- saveAsObjectFile:将RDD保存为序列化对象形式的文件
- saveAsSequenceFile:将RDD保存为SequenceFile形式的文件
- saveAsHadoopFile:将RDD保存为HDFS文件系统支持的文件
4.Spark物理执行计划
4.1 执行步骤
- 根据action操作将应用划分为job
- 根据每个job中的ShuffleDependency依赖关系,将job划分为stage
- 在每个stage中根据最后生成的RDD的分区个数生成多个task
5.总结及相关问题
这篇文档主要总结了大数据处理框架的通用结构,Spark的基本架构以及Spark的逻辑处理流程和物理执行流程,后面将对具体细节进行总结。
问题1:Spark为什么以线程方式运行task而不是进程方式?
用进程运行任务的好处是task之间相互独立,每个task独享资源,不会相互干扰,但是坏处是task之间不方便共享数据,会造成重复加载等问题,同时进程的启动和停止需要做很多工作,会降低执行效率。Spark采用线程为最小执行单位,好处是数据可以共享,并且提高执行效率,但是缺点是线程间会有资源竞争。
问题2:Spark中为什么要拆分为执行阶段?
主要有三个好处:
- 将job拆分为stage,使得stage中生成的task不会太大也不会太小,并且是同构的,便于并行执行
- 可以将多个操作放在一个task里面执行,进行串行、流水式处理,提高数据处理效率
- stage可以方便容忍错误,如果一个stage失效,重新运行该stage即可,不需要运行整个job
问题3:对数据依赖进行分类有什么好处?
- 明确RDD分区之间的数据依赖关系
- 有利于生成物理执行计划
- 有利于代码实现
以上是关于Spark相关总结的主要内容,如果未能解决你的问题,请参考以下文章