spark
Posted lvhongwi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark相关的知识,希望对你有一定的参考价值。
1、spark基础,什么是spark
Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个
子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用
户将Spark部署在大量廉价硬件之上,形成集群。
-
-
关键词:
-
统一:Spark能够适应多种计算场景 (离线计算、实时计算、机器学习、图计算、AI应用)。一般公司在进行技术选型过程,spark首选
-
大数据分析引擎:Spark能够分析数据,但是没有存储。一般线上的spark数据来源 (HDFS, Hive、Kafka、Flume、日志文件、关系型数据库、NoSQL数据库)。Spark数据出口(HDFS、Hive、Kafka、Redise、关系型数据库、NoSQL数据库)
-
分布式:Spark一般情况是以集群模式存在。架构 :Master/Slaver(主从结构)
-
-
应用场景
-
精准广告推荐系统(Spark机器学习,一般在广告或者电商公司应用)
-
金融风险管控系统 (对实时性要求比较,起码毫秒级)
-
精细化运行系统 (CMS系统 、BI系统,重点:多维分析)
-
用户画像 (用户数据画像)
-
-
2、spark的特点
-
-
典型数据处理流程:Spark在使用过程中,会读取HDFS上数据,并且会将HDFS中数据驻留在内存当中,将数据进行缓存、在后续数据迭代操作过程能够重用内存中的数。在逻辑回归处理(算法)中,Spark的速度要比Hadoop 理论上快100倍
-
与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。
-
-
Spark对程序员非常友好
-
spark支持多种语言(Java、Scala、Python、R、SQL)
-
Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。
-
-
Spark一站式解决方案
-
五大模块
-
SparkCore (处理离线数据)
-
SparkSQL (主要用来做多维数据分析、以及交互式查询)
-
SparkStreaming (实时数据处理程序)
-
Spark MLlib (机器学习 包含非常多算法,相当于Spark提供的一个算法)
-
Spark Graphx (图计算处理模块)
在开发Spark应用程序过程中,能够同时使用以上所有模块。以上模块能够无缝兼容
-
-
-
Spark能够兼容 (hadoop、hive、hbase、yarn、kafka、flume、redise、关系型数据等)
-
Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。
-
-
3、spark中重要的角色
Spark架构使用了分布式计算中master-slave模型,master是集群中含有master进程的节点,slave是集群中含有worker进程的节点。
-
-
Driver Program :运?main函数并且新建SparkContext的程序。
-
-
-
Application:基于Spark的应用程序,包含了driver程序和集群上的executor。
-
-
-
Cluster Manager:指的是在集群上获取资源的外部服务。目前有三种类型
-
(1)Standalone: spark原生的资源管理,由Master负责资源的分配
(2)Apache Mesos:与hadoop MR兼容性良好的一种资源调度框架
(3)Hadoop Yarn: 主要是指Yarn中的ResourceManager
-
-
-
-
Worker Node: 集群中任何可以运行Application代码的节点,在Standalone模式中指的是通过slaves文件配置的Worker节点,在Spark on Yarn模式下就是NodeManager节点
-
-
-
-
-
-
-
Executor:是在一个worker node上为某应用启动的?个进程,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。每个应用都有各自独立的executor。
-
-
-
-
-
-
Task :被送到某个executor上的工作单元。
-
-
ClusterManger:集群资源管理器,会接受SparkContext发送来指令(申请资源),然后向workerNode节点发送指令分配资源
-
standalone模式: spark的master节点
-
yarn模式:ResourceManage
-
-
WorkerNode:提供Spark应用程序运行时所需要的资源 (CPU和内存)。Workernode 在接受到clusterManger的指令后,会汇报worker的信息。
以上节点完成了Spark应用程序运行时所需要的资源
资源分配方式:
-
静态分配:一次性费配资源,在整个spark应用程序运行过程中,不会再次分配资源
-
standalone:属于静态分配
-
-
动态分配:在整个spark应用程序运行过程中,需要多少给多少,需要多次分配资源,一旦资源使用完成,会进行回收,再次需要的时候会再次申请资源
-
yarn:动态分配
-
-
-
-
按照程序运行的方式分配
-
Driver Programe: main +sparkcontext。 一般运行在Diver节点(可以是一台机器)
-
Driver节点可有与spark 集群分离。一般情况下Driver节点其实就是Spark集群中的某台机器。这样做是为了节省网络资源。因为在这个Spark应用程序运行过程中,会不断与Spark集群进行信息传递。
-
Spark-submit 把程序提交到yarn集群,yarn会根据集群资源状况,分配一个driver,然后spark程序会将jar上传到yarn,通过yarn去执行。
-
standalone:master节点就可以作为driver节点
-
-
Sparkcontext:是spark程序的入口对象。并且还是Spark应用程序的核心调度对象。在SparkContext对象初始化过程中,初始化了三个重要调度对象:高层调度器 DAGScheduler 底层调度 TaskScheduler SchedulerBackend 负责通信
-
Executor:运行在worker节点的一个进程。在Eecutor进程中,启动线程池运行Task。通过线程并发执行和以及线程复用的形式执行Task
-
Task:是数据处理任务最小单元,整个Spark应用程序最终会被划分成不同的Task,去运行处理数据。
-
cache:缓存,主要缓存RDD中数据的, 可以缓存到内存也可以缓存到磁盘,还可以缓存HDFS之上。
以上角色 就完成了整个Spark Job的调度
-
-
4、spark中的RDD
Dataset:一个数据集合,用于存放数据的。
Distributed:RDD中的数据是分布式存储的,可用于分布式计算。
Resilient:RDD中的数据可以存储在内存中或者磁盘中。
-
-
RDD概念:(A Resilient Distributed Dataset)弹性分布式数据集合。并且是spark最基本的编程抽象,而且RDD是只读、可分区的、可以进行并行计算的一个对象。
-
关键词:
-
数据集:RDD是一个数据容器,用来组织管理数据的。跟Array和List类似,并且都能够进行map、flatMap、filte等等
-
分布式:RDD的数据是分布存储的,也就是Spark集群中每个节点上只存储了RDD的部分数据。计算同样也是分布式并行计算的
-
弹性:
-
存储的弹性:RDD的数据可以在内存和磁盘之间进行自由切换
-
可靠性的弹性:RDD的在丢失数据的时候能够自动恢复。RDD在计算过程中会出现失败的情况,失败以后会进行一定次数的重试(4次)
-
并行度的弹性:RDD的数据分区可以改变,进而增加并行计算的粒度
-
?
-
-
-
RDD其他特点:
-
RDD的数据是只读,每次操作都会产生新的RDD。安全。
-
RDD中数据可以缓存内存、磁盘、HDFS之上
-
-
5、RDD弹性
RDD全称叫做弹性分布式数据集(Resilient Distributed Datasets),它是一种分布式的内存抽象,表示一个只读的记录分区的集合,它只能通过其他RDD转换而创建,为此,RDD支持丰富的转换操作(如map, join, filter, groupBy等),
通过这种转换操作,新的RDD则包含了如何从其他RDDs衍生所必需的信息,所以说RDDs之间是有依赖关系的。基于RDDs之间的依赖,RDDs会形成一个有向无环图DAG,该DAG描述了整个流式计算的流程,实际执行的时候,
RDD是通过血缘关系(Lineage)一气呵成的,即使出现数据分区丢失,也可以通过血缘关系重建分区,总结起来,基于RDD的流式计算任务可描述为:从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组确定性操作
构成的DAG,然后写回稳定存储。另外RDD还可以将数据集缓存到内存中,使得在多个操作之间可以重用数据集,基于这个特点可以很方便地构建迭代型应用(图计算、机器学习等)或者交互式数据分析应用。可以说Spark最初也
就是实现RDD的一个分布式系统,后面通过不断发展壮大成为现在较为完善的大数据生态系统,简单来讲,Spark-RDD的关系类似于Hadoop-MapReduce关系。
? Spark优先把数据放到内存中,如果内存放不下,就会放到磁盘里面,程序进行自动的存储切换
2) 基于血统的高效容错机制
? 在RDD进行转换和动作的时候,会形成RDD的Lineage依赖链,当某一个RDD失效的时候,可以通过重新计算上游的RDD来重新生成丢失的RDD数据。
3) Task如果失败会自动进行特定次数的重试
? RDD的计算任务如果运行失败,会自动进行任务的重新计算,默认次数是4次。
4) Stage如果失败会自动进行特定次数的重试
? 如果Job的某个Stage阶段计算失败,框架也会自动进行任务的重新计算,默认次数也是4次。
5) Checkpoint和Persist可主动或被动触发
? RDD可以通过Persist持久化将RDD缓存到内存或者磁盘,当再次用到该RDD时直接读取就行。也可以将RDD进行检查点,检查点会将数据存储在HDFS中,该RDD的所有父RDD依赖都会被移除。
6) 数据调度弹性
? Spark把这个JOB执行模型抽象为通用的有向无环图DAG,可以将多Stage的任务串联或并行执行,调度引擎自动处理Stage的失败以及Task的失败。
7) 数据分片的高度弹性
? 可以根据业务的特征,动态调整数据分片的个数,提升整体的应用执行效率。
8、RDD算子的操作
-
转换算子(Transform算子)
-
将一个RDD通过转换算子操作以后会构建新的RDD,比如map 、flatMap、reduceByKey
-
转换算子操作都是直接new新的RDD,此时RDD并没有进行真正的计算。转换算子只是对数据如何计算做了标记。转换算子都是懒加载。
-
-
重要算子操作
-
mapPartitions :作用于每个分区之上的
-
mapPartitions 和map区别:
-
mapPartitions 相当于partition批量操作
-
map作用于每一条数据
-
重要区别:mapPartitions 这个在大量task运行的时候可能会出现内存溢出的情况。小数据量的操作 mapPartitions 要由于map操作
-
-
-
groupByKey算子和ReduceByKey算子的区别
-
1.groupByKey 返回值:key->集合 ReduceByKey返回值: key-》值
-
2.ReduceByKey操作会在本地进行初步merge操作,能够较少网络数据的传输
-
-
coalesce 减少分区数据的算子
-
概算子可以进行shuffle也可以不进shuffle操作, coalesce(numPartitions: Int, shuffle: Boolean = false)
-
-
repartition 实际上是调用了 coalesce 算子 ,而且 repartition一定会进行shuffle操作,既可以增加也可以减少分区
-
-
Action算子
-
Action是提交一个Job任务的,action算子提交job以后才开始真正的去计算转换算子
-
action算子内部都会有一个runJob方法进行提交一个Job任务
-
9、血统机制
RDD只支持粗粒度转换,即只记录单个块上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,
它可以根据这些信息来重新运算和恢复丢失的数据分区。
RDD与RDD之间存在依赖关系,依赖关系都是通过转换算子构建的。转换算子都是懒加载的。Spark应用程序会通过Action算子触发Job操作,Job在运行过程中 是从后往前回溯的,回溯的时候就是根据RDD的依赖关系。
这样就构建了RDD的血统机制。有了依赖链条的存在,当RDD中数据丢失的时候,会根据血统机制进行自动恢复数据。
窄依赖:
父RDD中一个partition最多被子RDD中一个partition所依赖,所以当子RDD中一个parition数据丢失时会重算其相应的父RDD中的数据,不需要对整个RDD进行数据恢复。
宽依赖:父RDD中一个partition被子RDD中多个partition所依赖的, 所以如果子RDD中的一个partition数据丢失,那么他会重算其所依赖的所有父RDD的partition。
所以宽依赖操作会出现大量冗余的数据计算。
10、RDD的缓存机制
RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
11、chickpoint机制
-
-
操作步骤:
-
sparkContext.setCheckPointDir(hdfs路径)
-
rdd.checkPoint()
-
-
checkpoint操作也是懒加载的,action算子触发的
-
checkpoint机制也会触发job操作。是在整个job执行完成后,然后启动一个job去执行缓存操作。
-
checkpoint会打破rdd的血统机制,checkpoint的job执行完成之后,会清理掉其rdd的所有依赖关系
-
-
RDD缓存和checkpoint操作应用场景
-
如果计算特点耗时(耗时操作占用整个应用程序的30%),此时需要考虑缓存
-
shuffle操作之后,有必要将rdd数据缓存
-
读取大量数据操作之后(读取数据占用整个应用程序执行的30%),此时需要考虑缓存
-
一个计算过程其计算链条过长,可以在中间比较重要的过程设置缓存
-
作为最佳实践操作,一般情况在checkpoint之前会使用缓存机制cache
-
12、work节点
-
-
Executor会接受task,并且将task进一步封装到TaskRunner中,并且将TaskRunner提交到线程池中去运行,通过线程并发执行和线程复用的而形式执行Task逻辑。线程池使用的Executors.newCachedThreadPool,缓存线程池。Executor直到执行遇到ResultTask时候,表示执行完成,会将数据输出到控制台、外部存储设备中。
13、sparksql是什么
-
1、shark框架它是专门为spark准备的大规模数据处理的数据仓库工具
-
2、shark是依赖于hive,同时也依赖于spark的版本。
-
3、后期由于代码的开发复杂度和性能要求越来越高,发现hive底层运行mr限制了shark这个框架的发展
-
4、慢慢的终止了对shark的开发和维护
-
5、把重点工作转移到了sparksql上
相比于Spark RDD API,Spark SQL包含了对结构化数据和在其上运算的更多信息,Spark SQL使用这些信息进行了额外的优化,使对结构化数据的操作更加高效和方便。
有多种方式去使用Spark SQL,包括SQL、DataFrames API和Datasets API。但无论是哪种API或者是编程语言,它们都是基于同样的执行引擎,因此你可以在不同的API之间随意切换,它们各有各的特点,看你喜欢那种风格。
-
Spark SQL is Apache Spark‘s module for working with structured data
- SparkSql是apache spark 一个用来处理结构化数据的模块。
它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。
14、为什么要学习sparksql以及sparksql的四大特性
-
-
1、易整合
-
将sql查询与spark程序无缝混合,可以使用java、scala、python、R等语言的API操作。
-
-
-
-
2、统一的数据源访问
-
sparksql可以使用一种相同的方式去对接外部的数据源
-
SparkSession.read.文件格式方法(该文件格式的路径)
-
-
3、兼容hive
-
sparksql可以兼容hivesql,后期可以通过sparksql操作hivesql
-
-
4、支持行业标准的数据库连接
-
sparksql支持标准的数据库连接jdbc或者odbc
-
sparksql可以使用标准的数据库连接从关系型数据库中加载数据
-
后期也可以把处理完成的结果数据再次使用标准的数据库连接 写入到数据库中进行存储
-
-
15、DataFrame是什么
DataFrame前身是schemaRDD,schemaRDD是直接继承自RDD,是RDD的一个实现类,在spark1.3.0之后,把schemaRDD改名为DataFrame,DataFrame自己实现了RDD的大部分方法,并没有直接继承自RDD
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化。DataFrame可
以从很多数据源构建,比如:已经存在的RDD、结构化文件、外部数据库、Hive表。
16、DataFrame和RDD的优缺点
优点:
(1)编译时类型安全 ? 编译时就能检查出类型错误
(2)面向对象的编程风格 ? 直接通过对象调用方法的形式来操作数据
缺点:
(1)序列化和反序列化的性能开销 ? 无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化。
(2)GC的性能开销 ? 频繁的创建和销毁对象, 势必会增加GC
DataFrame通过引入schema和off-heap(不在堆里面的内存,指的是除了不在堆的内存,使用操作系统上的内存),解决了RDD的缺点, Spark通过schame就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结
构的部分就可以省略了;通过off-heap引入,可以快速的操作数据,避免大量的GC。但是却丢了RDD的优点,DataFrame不是类型安全的, API也不是面向对象风格的。
-
-
1、RDD
-
优点
-
1、代码开发时类型安全
-
后期再通过RDD进行代码开发,它会在编译的时候进行类型检查
-
-
2、具有面向对象编程的风格
-
-
缺点
-
1、数据的序列化和反序列化性能开销很大
-
在大数据分布式计算框架中,都会涉及到数据的跨进程进行网络传输,这里就涉及到数据的序列化,它会把数据的结构和数据本身内容进行序列化。然后在目标进程中进行反序列化来恢复得到对象数据。
-
-
2、构建大量的对象,导致频繁gc(垃圾回收)
-
RDD中封装了大量的对象,后期这些对象都需要java堆内存heap进行数据存储,这里就会导致内存不够用,后期就会频繁出现gc 垃圾回收,这里是比较影响性能。
-
-
-
-
2、DataFrame
-
dataFrame引入了2个概念:1个是schema(元数据信息) 1个是off-heap(使用操作系统层面上的内存)
-
优点
-
1、dataFrame引入了schema信息,也是dataFrame中有哪些字段名称和类型。后期数据在进行网络传输的时候,只需要把数据的本身内容进行序列化就可以了,数据的结构是不需要进行序列化,数据结构信息是可以省略掉了,最后数据的网络传输量就少了。性能就比较快了。 最后引入这个特性是解决了rdd的数据的序列化和反序列化性能开销很大缺点。
-
2、dataFrame引入了off-heap概念,大量的对象构建并不是直接使用heap堆中的内存,而是使用操作系统层面上的内存。最后导致JVM堆中的内存比较充足,不会出现GC,后期任务就不会暂停,运行的效率变快了。最后引入这个特性是解决了rdd的构建大量的对象,导致频繁gc(垃圾回收)缺点。
-
-
缺点
-
dataFrame分别解决了RDD的2个缺点,同时也丢失了RDD的优点
-
1、代码开发时类型不安全
-
2、不具有面向对象编程的风格
-
-
-
-
17、什么是sparkstriming
-
-
SparkStreaming可以非常容易的构建一个可扩展、具有容错机制的流式应用程序
-
说白一点就是一个实时处理框架
-
18、sparkstriming的特性
-
-
可以像开发离线批处理一样去编写实时处理程序,数据接收方式不一样,数据接收到之后,处理的逻辑是一样的。
-
可以支持不同语言的代码开发
-
JAVA
-
SCALA
-
PYTHON
-
-
-
2、容错性
-
sparkStreaming程序可以自己实现对应的容错机制
-
可以实现对数据的恰好一次语义
-
就是保证数据被处理 且只被处理一次
-
-
-
3、融合到spark生态系统中
-
可以把sparkStreaming实时处理与批处理和交互式查询进行结合使用
-
19、sparkstriming计算流程
Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换
成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中。整个流式计算根据
业务的需求可以对中间的结果进行缓存或者存储到外部设备。下图显示了Spark Streaming的整个流程。
20、sparkstriming的容错机制
对于流式计算来说,容错性至关重要。首先我们要明确一下Spark中RDD的容错机制。每一个RDD都是一个不可变的分布式可重算的数据集,其记录着确定性的操作继承关系(lineage),所以只要输入数据是可容错的,那么任
意一个RDD的分区(Partition)出错或不可用,都是可以利用原始输入数据通过转换操作而重新算出的。
sparkStreaming在接受到数据之后,他会把网络中的数据流拷贝多份到其他机器,最大程度的保证数据源端的安全性,数据源端的安全性得到保证之后,
以上是关于spark的主要内容,如果未能解决你的问题,请参考以下文章
SparkSpark ShuffleSpark SQL 及 Spark MLlib