Spark 长文详解
Posted BasicLab基础架构实验室
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 长文详解相关的知识,希望对你有一定的参考价值。
0、Spark 的发展史
大数据、人工智能( Artificial Intelligence )像当年的石油、电力一样,正以前所未有的广度和深度影响所有的行业,现在及未来公司的核心壁垒是数据,核心竞争力来自基于大数据的人工智能的竞争。
Spark是当今大数据领域最活跃、最热门、最高效的大数据通用计算平台,
2009年诞生于美国加州大学伯克利分校AMP 实验室,
2010年通过BSD许可协议开源发布,
2013年捐赠给Apache软件基金会并切换开源协议到切换许可协议至Apache2.0,
2014年2月,Spark成为Apache的顶级项目
2014年11月, Spark的母公司Databricks团队使用Spark刷新数据排序世界记录
Spark成功构建起了一体化、多元化的大数据处理体系。在任何规模的数据计算中, Spark在性能和扩展性上都更具优势。
(1) Hadoop 之父Doug Cutting指出:Use of MapReduce engine for Big Data projects will decline, replaced by Apache Spark (大数据项目的MapReduce引擎的使用将下降,由Apache Spark 取代)
(2)Hadoop 商业发行版本的市场领导者Cloudera 、HortonWorks 、MapR 纷纷转投Spark,并把Spark作为大数据解决方案的首选和核心计算引擎。
2014年的如此Benchmark测试中, Spark秒杀Hadoop ,在使用十分之一计算资源的情况下,相同数据的排序上, Spark比MapReduce快3倍!在没有官方PB 排序对比的情况下,首次将S park 推到了IPB 数据(十万亿条记录) 的排序,在使用190个节点的情况下,工作负载在4小时内完成, 同样远超雅虎之前使用3800台主机耗时16个小时的记录。
2015年6月,Spark最大的集群来自腾讯–8000个节点,单个Job最大分别是阿里巴巴和Databricks–1PB,震撼人心!同时,Spark的Contributor比2014年涨了3倍,达到730人:总代码行数也比2014年涨了2倍多,达到40行。
IBM于2015 年6 月承诺大力推进Apache Spark 项目, 并称该项目为:以数据为主导的,未来十年最重要的新的开源项目。这-承诺的核心是将Spark 嵌入IBM 业内领先的分析和商务平台,并将Spark 作为一项服务,在IBMB平台上提供给客户。IBM还将投入超过3500名研究和开发人员在全球10余个实验室开展与Spark相关的项目,并将为Spark开源生态系统无偿提供突破性的机器学习技术–IBM SystemML。同时,IBM还将培养超过100万名Spark数据科学家和数据工程师。
2016年,在有“计算界奥运会”之称的国际著名Sort Benchmark全球数据排序大赛中,由南京大学计算机科学与技术系PASA大数据实验室、阿里巴巴和Databricks公司组成的参赛因队NADSort,以144美元的成本完成lOOTB标准数据集的排序处理,创下了每TB数据排序1.44美元成本的最新世界纪录,比2014 年夺得冠军的加州大学圣地亚哥分校TritonSort团队每TB数据4.51美元的成本降低了近70%,而这次比赛依旧使用Apache Spark大数据计算平台,在大规模并行排序算法以及Spark系统底层进行了大量的优化,以尽可能提高排序计算性能并降低存储资源开销,确保最终赢得比赛。
在FullStack理想的指引下,Spark中的Spark SQL 、SparkStreaming 、MLLib 、GraphX 、R五大子框架和库之间可以无缝地共享数据和操作, 这不仅打造了Spark在当今大数据计算领域其他计算框架都无可匹敌的优势, 而且使得Spark正在加速成为大数据处理中心首选通用计算平台。
其次 Spark 为什么会流行呢????
原因1:优秀的数据模型和计算抽象
Spark 产生之前,已经有MapReduce这类非常成熟的计算系统存在了,并提供了高层次的API(map/reduce),把计算运行在集群中并提供容错能力,从而实现分布式计算。
虽然MapReduce提供了对数据访问和计算的抽象,但是对于数据的复用就是简单的将中间数据写到一个稳定的文件系统中(例如HDFS),所以会产生数据的复制备份,磁盘的I/O以及数据的序列化,所以在遇到需要在多个计算之间复用中间结果的操作时效率就会非常的低。而这类操作是非常常见的,例如迭代式计算,交互式数据挖掘,图计算等。
认识到这个问题后,学术界的AMPLab提出了一个新的模型,叫做RDD。RDD是一个可以容错且并行的数据结构(其实可以理解成分布式的集合,操作起来和操作本地集合一样简单),它可以让用户显式的将中间结果数据集保存在内存中,并且通过控制数据集的分区来达到数据存放处理最优化.同时RDD也提供了丰富的 API (map、reduce、foreach、redeceByKey…)来操作数据集。后来 RDD被AMPLab在一个叫做 Spark 的框架中提供并开源.
简而言之,Spark借鉴了MapReduce思想发展而来,保留了其分布式并行计算的优点并改进了其明显的缺陷。让中间数据存储在内存中提高了运行速度、并提供丰富的操作数据的API提高了开发速度。
原因2:完善的生态圈
目前,Spark已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目
Spark Core:实现了Spark的基本功能,包含RDD、任务调度、内存管理、错误恢复、与存储系统交互等模块。
Spark SQL:Spark用来操作结构化数据的程序包。通过Spark SQL,我们可以使用SQL操作数据。
Spark Streaming:Spark提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API。
Spark MLlib:提供常见的机器学习(ML)功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外的支持功能。
GraphX(图计算):Spark中用于图计算的API,性能良好,拥有丰富的功能和运算符,能在海量数据上自如地运行复杂的图算法。
集群管理器:Spark设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。
扩展阅读:Spark VS Hadoop
★注意:
尽管Spark相对于Hadoop而言具有较大优势,但Spark并不能完全替代Hadoop ,Spark主要用于替代Hadoop中的MapReduce计算模型。存储依然可以使用HDFS,但是中间结果可以存放在内存中;调度可以使用Spark内置的,也可以使用更成熟的调度系统YARN等
实际上,Spark已经很好地融入了Hadoop生态圈,并成为其中的重要一员,它可以借助于YARN实现资源调度管理,借助于HDFS实现分布式存储。
此外,Hadoop可以使用廉价的、异构的机器来做分布式存储与计算,但是,Spark对硬件的要求稍高一些,对内存与CPU有一定的要求。
一、Spark 概述详解
1、Spark 是什么?
Apache Spark是用于大规模数据处理的统一分析引擎
Spark基于内存计算,提高了在大数据环境下数据处理的实时性,
同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量硬件之上,形成集群。
Spark 官网
http://spark.apachecn.org
http://spark.apachecn.org
2、Spark 架构组成及原理
1.Spark Core:包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的。
2.Spark SQL:提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作。
3.Spark Streaming:对实时数据流进行处理和控制。Spark Streaming允许程序能够像普通RDD一样处理实时数据。
4.MLlib:一个常用机器学习算法库,算法被实现为对RDD的Spark操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。
5.GraphX:控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作。
Spark 架构的组成图如下:
有图可见,Master是Spark的主控节点,在实际的生产环境中会有多个Master,只有一个Master处于active状态。Worker是Spark的工作节点,向Master汇报自身的资源、Executeor执行状态的改变,并接受Master的命令启动 Executor或Driver。Driver是应用程序的驱动程序,每个应用包括许多小任务,Driver负责推动这些小任务的有序执行。
Executor是Spark的工作进程,由Worker监管,负责具体任务的执行。
Master-Worker(分别的作用及关系)
整个Spark集群中,分为Master节点与worker节点,同时一个集群有多个master节点和多个worker节点。
1.Master:主节点,该节点负责管理worker节点,我们从master节点提交应用,负责将串行任务变成可并行执行的任务集Tasks,同时还负责出错问题处理等;
2.Worker:从节点,该节点与master节点通信,负责执行任务并管理executor进程。它为集群中任何可以运行Application代码的节点,在Standalone模式中指的是通过slave文件配置的Worker节点,在Spark on Yarn模式下就是NoteManager节点。
1.Application
Appliction都是指用户编写的Spark应用程序,其中包括一个Driver功能的代码和分布在集群中多个节点上运行的Executor代码
2.Driver
Driver是spark的驱动节点,用于执行spark任务中的main方法,负责实际代码的执行工作。主要负责以下任务:
1)将用户程序转化为作业(job)
2)在Executor之间调度任务
3)跟踪Executor的执行情况
4)通过 UI 展示查询运行情况
通俗理解Driver就是驱使整个应用运行起来的程序,也称之为Driver类
3.Executor
理解1:
Spark Executor是集群中工作节点(Worker)中的一个JVM进程,负责在Spark作业中运行具体任务(Task),任务彼此之间相互独立。Spark应用启动时,Executor节点被同时启动,并且始终伴随着整个Spark应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行。
1)负责运行组成Spark应用的任务,并将结果返回给驱动器进程
2)它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD 提供内存式存储。RDD是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
理解2:
执行器,为某个Application运行在worker节点上的一个进程,该进程负责运行某些Task,并且负责将数据存到内存或磁盘上,每个Application都有各自独立的一批Executor进程。executor宿主在worker节点上,每个 Worker 上存在一个或多个Executor进程,每个executor持有一个线程池,每个线程可以执行一个task。根据Executor上CPU-core的数量,其每个时间可以并行多个跟core一样数量的task。task任务即为具体执行的Spark程序的任务。executor执行完task以后将结果返回给driver,每个executor执行的task都属于同一个应用。此外executor还有一个功能就是为应用程序中要求缓存的 RDD 提供内存式存储,RDD是直接缓存在executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。
因此Executor的内存主要分为三块:第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占Executor总内存的60%。
task的执行速度是跟每个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每个Executor进程上分配到的多个task,都是以每个task一条线程的方式,多线程并发运行的。如果CPU core数量比较充足,而且分配到的task数量比较合理,那么通常来说,可以比较快速和高效地执行完这些task线程。
4.Cluter Manager
集群管理器,指的是在集群上获取资源的外部服务。目前有三种类型:
1)Standalone : spark原生的资源管理,由Master负责资源的分配,易于构建集群
2)Apache Mesos:通用的集群管理,与hadoop MR兼容性良好的一种资源调度框架,可以在其上运行Hadoop MapReduce和一些服务应用
3)Hadoop Yarn: 主要是指Yarn中的ResourceManager
在集群不是特别大,并且没有mapReduce和Spark同时运行的需求的情况下,用Standalone模式效率最高。
5.Task(任务)
真正执行计算的部分。Stage相当于TaskSet,每个Stage内部包含了多个Task,将各个Task下发到各个Executor执行计算。
每个Task的处理逻辑完全一样,不同的是对应处理的数据。即:移动计算而不是移动数据。
Task是真正干活的,所以说是它间接决定了Spark程序的快慢也不过分。
6.Job(作业)
Spark根据行动操作触发提交作业,以行动操作将我们的代码切分为多个Job。
7.Stage(调度阶段)
每个Job中,又会根据宽依赖将Job划分为多个Stage(包括ShuffleMapStage和ResultStage)。
Job、Stage、Task 的对应关系如下:
8.DAGScheduler
根据Job构建基于Stage的DAG(Directed Acyclic Graph有向无环图),并提交Stage给TASkScheduler。 其划分Stage的依据是RDD之间的依赖的关系找出开销最小的调度方法
扩展(DAG 划分 Stage 原理):
Spark 在分布式环境下将数据分区, 然后将作业转化为DAG, 并分阶段进行 DAG的调度和任务的分布式并行处理。DAG将调度提交给DAGScheduler, DAGScheduler调度时会根据是否需要经过Shuffle过程将 Job划分为多个Stage。
为了方便理解DAGScheduler划分Stage的原理,下面来看一个典型的DAG划分Stage示意图,如图所示。
在上图中,RDD a 到 ShuffledRDD之间, 以及UnionRDD到CoGroupedRDD之间的数据需要经过Shuffle过程, 因此ROD a 和UnionRDD分别是Stage 1 跟Stage 3和Stage 2 跟Stage 3的划分点。而ShuffledRDD到CoGroupedRDD之间,以及RDD b到MappedRDD到UnionRDD和RDD c 到UnionRDD之间的数据不需要经过Shuffle过程。因此,ShuffledRDD和CoGroupedRDD的依赖是窄依赖,两个RDD属于同一个Stage3,其余RDD划分为2个Stage。Stage1和Stage2是相对独立的,可以并行运行。Stage3则依赖于Stage1和Stage2的运行结果,所以Stage3最后执行。
由此可见,在DAGScheduler调度过程中,Stage阶段换份是依据作业是否有Shuffle过程,也就是存在ShuffleDependency的宽依赖时,需要进行Shuffle,此时才会将作业划分为多个Stage。
9.TASKSedulter
TaskScheduler概念:
将TaskSET提交给worker运行,每个Executor运行什么Task就是在此处分配的。TaskScheduler维护所有TaskSet,当Executor向Driver发生心跳时,TaskScheduler会根据资源剩余情况分配相应的Task。另外TaskScheduler还维护着所有Task的运行标签,重试失败的Task。
TaskScheduler 原理:
1)DAGScheduler在提交Taskset给底层调度器的时候是面向接口TaskScheduler的,这符合面向对象中依赖抽象原则,带来底层资源调度器的可插拔性, 导致Spark可以运行在众多的资源高度器模式上。例如:Standalone, Yarn, Mesos, local. EC2及其它自定义的资源调度器
2)在SparkContext实例化的时候通过createTaskScheduler来创建 TaskSchedulerImpl和StandaloneSchedulerBackend。在TaskSchedulerImpl的initialize方法中把StandaloneSchedulerBackend 传进来从而赋值给TaskSchedulerImpl的backend;在TaskSchedulerImple调用start方法的时候会调用 backend.start方法。
3)TaskScheduler的核心任务是提交TaskSet到集群运算并汇报结果
a)为TaskSet创建和维护一个TaskSetManager并追踪任务的本地性以及错误信息;
b)遇到 Straggle任务会放到其它的节点进行重试。
c)向DAGScheduler 汇报执行情况,包括在Shuffle输出lost的时候报告fetch failed错误等信息。
4)TaskScheduler内部会握有SchedulerBackend,从Standalone的模式来讲,具体实现是StandaloneSchedulerBackend。
5)StandaloneSchedulerBackend在启动的时候创建StandaloneAppClient实例并在该实例Start的时候启动了ClientEndpoint这个消息循环体,ClientEndpoint在启动的时候会向Master注册当前程序。而StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend在start的时候会实例化类型为DriverEndpoint的消息循环体。StandaloneSchedulerBackend专门负责收集Worker上的资源信息。当ExecutorBackend启动的时候会发送RegisteredExecutor信息向DriverEndpoint注册。此时StandaloneSchedulerBackend就掌握了当前应用程序拥有的计算资源,就是通过 StandaloneSchedulerBackend拥有的计算资源来具体运行Task。
6)SparkContext、DAGScheduler、TaskSchedulerImpl、StandaloneSchedulerBackend在应用程序启动的时候只实例化一次,应用程序存在期间始终存在这些对象。
下图展示了 TaskScheduler 的作用:
3、Spark 特点!
● 快
与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,
基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,
可以通过基于内存来高效处理数据流。
● 易用(算法多)
MR只支持一种计算算法,Spark支持多种算法。
Spark支持Java、Python、R和Scala的API,还支持超过80种高级算法,
使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的 shell,
可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。
● 通用
Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、
实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。
这些不同类型的处理都可以在同一个应用中无缝使用。Spark 统一的解决方案非常具有吸引力,
毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。
● 兼容性
Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,
并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。
这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。
Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,
这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用 Spark。
此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。
4、Spark 的几种运行模式
1.local 本地模式(单机)–开发测试使用
分为local单线程和local-cluster多线程
2.standalone 独立集群模式–开发测试使用
典型的Mater/slave模式
3.standalone-HA 高可用模式–生产环境使用
基于standalone模式,使用zk搭建高可用,避免Master是有单点故障的
4.on yarn 集群模式–生产环境使用
运行在yarn集群之上,由yarn负责资源管理,Spark负责任务调度和计算,
好处:计算资源按需伸缩,集群利用率高,共享底层存储,避免数据跨集群迁移。
5.on mesos 集群模式–国内使用较少
运行在mesos资源管理器框架之上,由mesos负责资源管理,Spark负责任务调度和计算
6.on cloud 集群模式–中小公司未来会更多的使用云服务
比如AWS的EC2,使用这个模式能很方便的访问Amazon的S3
二、Spark 的三大集群模式详解!(附带各模式部署步骤)
1、standalone 集群模式
1.1 集群角色介绍
standalone 独立集群模式–开发测试使用
Spark是基于内存计算的大数据并行计算框架,
实际中运行计算任务肯定是使用集群模式,
那么我们先来学习Spark自带的standalone集群模式了解一下它的架构及运行机制。
Standalone集群使用了分布式计算中的master-slave模型,
master是集群中含有master进程的节点
slave是集群中的worker节点含有Executor9进程
Spark 架构图如下(先了解):
1.2 集群规划
node01:master
node02:slave/worker
node03:slave/worker
1.3 修改配置并分发
1)修改 Spark 配置文件(进入到自己解压后的 spark 目录下)
cd /export/servers/spark/conf
mv spark-env.sh.template spark-env.sh
vim spark-env.sh
#配置java环境变量(如果之前配置过了就不需要动了)
export JAVA_HOME=/export/servers/jdk1.8
#指定spark Master的IP
export SPARK_MASTER_HOST=node01
#指定spark Master的端口
export SPARK_MASTER_PORT=7077
mv slaves.template slaves
vim slaves
node02
node03
2)配置 spark 环境变量 (建议不添加,避免和 Hadoop 的命令冲突)
将spark添加到环境变量,添加以下内容到 /etc/profile
export SPARK_HOME=/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0
export PATH=$PATH:$SPARK_HOME/bin
注意:
hadoop/sbin 的目录和 spark/sbin 可能会有命令冲突:
start-all.sh stop-all.sh
解决方案:
1.把其中一个框架的sbin从环境变量中去掉;
2.改名 hadoop/sbin/start-all.sh 改为: start-all-hadoop.sh
3)通过 scp 命令将配置文件分发到其他机器上
scp -r /export/servers/spark node02:/export/servers
scp -r /export/servers/spark node03:/export/servers
scp /etc/profile root@node02:/etc
scp /etc/profile root@node03:/etc
source /etc/profile 刷新配置
1.4 启动和停止
集群启动和停止
在主节点上启动spark集群
/export/servers/spark/sbin/start-all.sh
在主节点上停止spark集群
/export/servers/spark/sbin/stop-all.sh
单独启动和停止
在master安装节点上启动和停止master:
start-master.sh
stop-master.sh
在Master所在节点上启动和停止worker(work指的是slaves配置文件中的主机名)
start-slaves.sh
stop-slaves.sh
1.5 查看 web 界面
正常启动spark集群后,查看spark的web界面,查看相关信息。
http://node01:8080/
1.6 测试
需求
使用集群模式运行Spark程序读取HDFS上的文件并执行WordCount
集群模式启动 spark-shell
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-shell --master spark://node01:7077
运行程序
sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")
.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
.saveAsTextFile("hdfs://node01:8020/wordcount/output2")
SparkContext web UI
http://node01:4040/jobs/
注意
集群模式下程序是在集群上运行的,不要直接读取本地文件,应该读取hdfs上的 因为程序运行在集群上,具体在哪个节点上我们运行并不知道,其他节点可能并没有那个数据文件
2、standalone-HA 高可用模式
2.1 原理
Spark Standalone集群是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群一样,存在着Master单点故障的问题。
如何解决这个单点故障的问题,Spark提供了两种方案:
1.基于文件系统的单点恢复(Single-Node Recovery with Local File System)--只能用于开发或测试环境。
2.基于zookeeper的Standby Masters(Standby Masters with ZooKeeper)--可以用于生产环境。
2.2 配置 HA
该 HA 方案使用起来很简单,首先启动一个 ZooKeeper 集群,然后在不同节点上启动 Master,注意这些节点需要具有相同的 zookeeper 配置。
1)先停止Sprak集群
/export/servers/spark/sbin/stop-all.sh
2)在node01上配置:
vim /export/servers/spark/conf/spark-env.sh
3)注释掉Master配置
#export SPARK_MASTER_HOST=node01
4)在spark-env.sh添加SPARK_DAEMON_JAVA_OPTS,内容如下:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181
-Dspark.deploy.zookeeper.dir=/spark"
参数说明
spark.deploy.recoveryMode:恢复模式
spark.deploy.zookeeper.url:ZooKeeper的Server地址
spark.deploy.zookeeper.dir:保存集群元数据信息的文件、目录。包括Worker、Driver、Application信息。
2.3 启动 zk 集群
zkServer.sh status
zkServer.sh stop
zkServer.sh start
2.4 启动 Spark 集群
1)node01上启动Spark集群执行
/export/servers/spark/sbin/start-all.sh
2)在node02上再单独只起个master:
/export/servers/spark/sbin/start-master.sh
3)注意:
在普通模式下启动spark集群
只需要在主节点上执行start-all.sh 就可以了
在高可用模式下启动spark集群
先需要在任意一台主节点上执行start-all.sh
然后在另外一台主节点上单独执行start-master.sh
4)查看node01和node02
http://node01:8080/
http://node02:8080/
可以观察到有一台状态为StandBy
2.5 测试 HA
测试主备切换
1)在node01上使用jps查看master进程id
2)使用kill -9 id号强制结束该进程
3)稍等片刻后刷新node02的web界面发现node02为Alive
2.6 测试集群模式提交任务
1.集群模式启动spark-shell
/export/servers/spark/bin/spark-shell --master
spark://node01:7077,node02:7077
2.运行程序
sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")
.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
.saveAsTextFile("hdfs://node01:8020/wordcount/output3")
3、 on yarn 集群模式
官方文档http://spark.apache.org/docs/latest/running-on-yarn.html
3.1 准备工作
1.安装启动Hadoop(需要使用HDFS和YARN,已经ok)
2.安装单机版Spark(已经ok)
注意:不需要集群,因为把Spark程序提交给YARN运行本质上是把字节码给YARN集群上的JVM运行,
但是得有一个东西帮我去把任务提交上个YARN,所以需要一个单机版的Spark,
里面的有spark-shell命令,spark-submit命令
3.修改配置:
在spark-env.sh ,添加HADOOP_CONF_DIR配置,指明了hadoop的配置文件的位置
vim /export/servers/spark/conf/spark-env.sh
export HADOOP_CONF_DIR=/export/servers/hadoop/etc/hadoop
3.2 cluster 模式
1)说明
在企业生产环境中大部分都是cluster部署模式运行Spark应用
Spark On YARN的Cluster模式 指的是Driver程序运行在YARN集群上
2)补充Driver是什么:
运行应用程序的main()函数并创建SparkContext的进程
3)图解
4)运行示例程序
spark-shell是一个简单的用来测试的交互式窗口
spark-submit用来提交打成jar包的任务
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-submit \\
--class org.apache.spark.examples.SparkPi \\
--master yarn \\
--deploy-mode cluster \\
--driver-memory 1g \\
--executor-memory 1g \\
--executor-cores 2 \\
--queue default \\
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \\
10
5)查看界面
http://node01:8088/cluster
3.3 client 模式[了解]
1)说明
学习测试时使用,开发不用,了解即可
Spark On YARN的Client模式 指的是Driver程序运行在提交任务的客户端
2)图解
3)运行示例程序
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-submit \\
--class org.apache.spark.examples.SparkPi \\
--master yarn \\
--deploy-mode client \\
--driver-memory 1g \\
--executor-memory 1g \\
--executor-cores 2 \\
--queue default \\
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \\
10
3.4 两种模式的区别
Cluster 和 Client 模式最本质的区别是:Driver 程序运行在哪里!
运行在YARN集群中就是Cluster模式,
运行在客户端就是Client模式
当然还有由本质区别延伸出来的区别,面试的时候能简单说出几点就行
cluster模式:生产环境中使用该模式
1.Driver程序在YARN集群中
2.应用的运行结果不能在客户端显示
3.该模式下Driver运行ApplicattionMaster这个进程中,如果出现问题,yarn会重启ApplicattionMaster(Driver)
client模式:
1.Driver运行在Client上的SparkSubmit进程中
2.应用程序运行结果会在客户端显示
4、 Spark 参数详解
4.1 spark-shell
spark-shell 是 Spark 自带的交互式 Shell 程序,方便用户进行交互式编程,用户可以在该命令行下可以用 scala 编写 spark 程序,适合学习测试时使用!
示例:
spark-shell可以携带参数
spark-shell --master local[N] 数字N表示在本地模拟N个线程来运行当前任务
spark-shell --master local[*] *表示使用当前机器上所有可用的资源
默认不携带参数就是--master local[*]
spark-shell --master spark://node01:7077,node02:7077 表示运行在集群上
4.2 spark-submit
spark-submit 命令用来提交 jar 包给 spark 集群/YARN spark-shell 交互式编程确实很方便我们进行学习测试,但是在实际中我们一般是使用 IDEA 开发 Spark 应用程序打成 jar 包交给 Spark 集群/YARN 去执行。spark-submit 命令是我们开发时常用的!!!
示例:计算π
cd /export/servers/spark
/export/servers/spark/bin/spark-submit \\
--class org.apache.spark.examples.SparkPi \\
--master spark://node01:7077 \\
--executor-memory 1g \\
--total-executor-cores 2 \\
/export/servers/spark/examples/jars/spark-examples_2.11-2.2.0.jar \\
10
4.3 参数总结
Master 参数形式
其他参数示例
--master spark://node01:7077 指定Master的地址
--name "appName" 指定程序运行的名称
--class 程序的main方法所在的类
--jars xx.jar 程序额外使用的jar包
--driver-memory 512m Driver运行所需要的内存, 默认1g
--executor-memory 2g 指定每个executor可用内存为 2g, 默认1g
--executor-cores 1 指定每一个executor可用的核数
--total-executor-cores 2 指定整个集群运行任务使用的cup 核数为2个
--queue default 指定任务的对列
--deploy-mode 指定运行模式(client/cluster)
注意:
如果worker节点的内存不足,那么在启动spark-submit的时候,就不能为executor分配超出worker可用的内存容量。
如果–executor-cores超过了每个worker可用的cores,任务处于等待状态。
如果–total-executor-cores即使超过可用的cores,默认使用所有的。以后当集群其他的资源释放之后,就会被该程序所使用。
如果内存或单个executor的cores不足,启动spark-submit就会报错,任务处于等待状态,不能正常执行。
三、SparkCore 入门详解
1、RDD 详解
1.1 什么是 RDD???
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集 ,是 Spark 中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合 。
单词拆解
Resilient :它是弹性的,RDD中的数据可以保存在内存中或者磁盘里面
Distributed :它里面的元素是分布式存储的,可以用于分布式计算
Dataset: 它是一个集合,可以存放很多元素
1.2 为什么要有 RDD?
在许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘中,不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。但是,之前的MapReduce框架采用非循环式的数据流模型,把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销。且这些框架只能支持一些特定的计算模式(map/reduce),并没有提供一种通用的数据抽象。
AMP实验室发表的一篇关于RDD的论文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》就是为了解决这些问题的
RDD提供了一个抽象的数据模型,让我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换操作(函数),不同RDD之间的转换操作之间还可以形成依赖关系,进而实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘IO和序列化开销,并且还提供了更多的API(map/reduec/filter/groupBy…)
1.3 RDD 的主要属性
1)A list of partitions :
一组分片(Partition)/一个分区(Partition)列表,即数据集的基本组成单位。
对于RDD来说,每个分片都会被一个计算任务处理,分片数决定并行度。
用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。
2)A function for computing each split :
一个函数会被作用在每一个分区。
Spark中RDD的计算是以分区为单位的,compute函数会被作用到每个分区上
3)A list of dependencies on other RDDs:
一个RDD会依赖于其他多个RDD。
RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。(Spark的容错机制)
4)Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned):
Spark中的分区函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。
对于KV类型的RDD会有一个Partitioner函数,即RDD的分区函数(可选项)
只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数决定了RDD本身的分区数量,也决定了parent RDD Shuffle输出时的分区数量。
5)Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file):
可选项,一个列表,存储每个Partition的位置(preferred location)。
对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照"移动数据不如移动计算"的理念,Spark在进行任务调度的时候,会尽可能选择那些存有数据的worker节点来进行任务计算。
总结
RDD 是一个数据集,不仅表示了数据集,还表示了这个数据集从哪来,如何计算。
主要属性包括
1.多分区
2.计算函数
3.依赖关系
4.分区函数(默认是hash)
5.最佳位置
2、RDD-API
2.1 创建 RDD
1)由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等
val rdd1 = sc.textFile(“hdfs://node01:8020/wordcount/input/words.txt”)
2)通过已有的RDD经过算子转换生成新的RDD
val rdd2=rdd1.flatMap(_.split(" "))
3)由一个已经存在的Scala集合创建
val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
或者
val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))
makeRDD方法底层调用了parallelize方法
2.2 RDD 的方法/算子分类
2.2.1 分类
RDD 的算子分为两类:
1)Transformation转换操作:返回一个新的RDD
2)Action动作操作:返回值不是RDD(无返回值或返回其他的)
注意:
RDD不实际存储真正要计算的数据,而是记录了数据的位置在哪里,数据的转换关系(调用了什么方法,传入什么函数)
RDD中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接计算。只有当发生一个要求返回结果给Driver的 Action动作时,这些转换才会真正运行。
之所以使用惰性求值/延迟执行,是因为这样可以在Action时对RDD操作形成DAG有向无环图进行Stage的划分和并行优化,这种设计让Spark更加有效率地运行。
2.2.2 Transformation 转换算子
转换 | 含义 |
---|---|
map(func) | 返回一个新的 RDD,该 RDD 由每一个输入元素经过 func 函数转换后组成 |
filter(func) | 返回一个新的 RDD,该 RDD 由经过 func 函数计算后返回值为 true 的输入元素组成 |
flatMap(func) | 类似于 map,但是每一个输入元素可以被映射为 0 或多个输出元素(所以 func 应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 类似于 map,但独立地在 RDD 的每一个分片上运行,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 类似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根据 fraction 指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed 用于指定随机数生成器种子 |
union(otherDataset) | 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD |
intersection(otherDataset) | 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD |
distinct([numTasks])) | 对源 RDD 进行去重后返回一个新的 RDD |
groupByKey([numTasks]) | 在一个(K,V)的 RDD 上调用,返回一个(K, Iterator[V])的 RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的 RDD 上调用,返回一个(K,V)的 RDD,使用指定的 reduce 函数,将相同 key 的值聚合到一起,与 groupByKey 类似,reduce 任务的个数可以通过第二个可选的参数来设置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 分组操作。调用 groupByKey,常用。类似于 aggregate,操作的数据类型。 |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序的(K,V)的 RDD |
sortBy(func,[ascending], [numTasks]) | 与 sortByKey 类似,但是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的(K,(V,W))的 RDD |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD |
cartesian(otherDataset) | 笛卡尔积 |
pipe(command, [envVars]) | 对 rdd 进行管道操作 |
coalesce(numPartitions) | 减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作 |
repartition(numPartitions) | 重新给 RDD 分区 |
显示详细信息
2.2.3 Action 动作算子
动作 | 含义 |
---|---|
reduce(func) | 通过 func 函数聚集 RDD 中的所有元素,这个功能必须是可交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
first() | 返回 RDD 的第一个元素(类似于 take(1)) |
take(n) | 返回一个由数据集的前 n 个元素组成的数组 |
takeSample(withReplacement,num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的 num 个元素组成,可以选择是否用随机数替换不足的部分,seed 用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) | 返回自然顺序或者自定义顺序的前 n 个元素 |
saveAsTextFile(path) | 将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统。 |
saveAsObjectFile(path) | 将数据集的元素,以 Java 序列化的方式保存到指定的目录下 |
countByKey() | 针对(K,V)类型的 RDD,返回一个(K,Int)的 map,表示每一个 key 对应的元素个数。 |
foreach(func) | 在数据集的每一个元素上,运行函数 func 进行更新。 |
foreachPartition(func) | 在数据集的每一个分区上,运行函数 func |
显示详细信息
统计操作
算子 | 含义 |
---|---|
count | 个数 |
mean | 均值 |
sum | 求和 |
max | 最大值 |
min | 最小值 |
variance | 方差 |
sampleVariance | 从采样中计算方差 |
stdev | 标准差:衡量数据的离散程度 |
sampleStdev | 采样的标准差 |
stats | 查看统计结果 |
显示详细信息
2.3 基础练习[快速演示]
2.3.1 准备工作
集群模式启动
启动Spark集群
/export/servers/spark/sbin/start-all.sh
启动spark-shell
/export/servers/spark/bin/spark-shell \\
--master spark://node01:7077 \\
--executor-memory 1g \\
--total-executor-cores 2
或本地模式启动
/export/servers/spark/bin/spark-shell
2.3.2 WordCount
val res = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//上面的代码不会立即执行,因为都是 Transformation 转换操作 //下面的代码才会真正的提交并执行,因为是 Action 动作/行动操作
res.collect
2.3.3 创建 RDD
val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))
val rdd2 = sc.makeRDD(List(5,6,4,7,3,8,2,9,1,10))
2.3.4 查看该 RDD 的分区数量
sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).partitions.length
//没有指定分区数,默认值是2
sc.parallelize(List(5,6,4,7,3,8,2,9,1,10),3).partitions.length
//指定了分区数为3
sc.textFile("hdfs://node01:8020/wordcount/input/words.txt").partitions.length
//2
RDD 分区的数据取决于哪些因素?
RDD分区的原则是使得分区的个数尽量等于集群中的CPU核心(core)数目,这样可以充分利用CPU的计算资源,但是在实际中为了更加充分的压榨CPU的计算资源,会把并行度设置为cpu核数的2~3倍。RDD分区数和启动时指定的核数、调用方法时指定的分区数、如文件本身分区数 有关系
分区原则
1)启动的时候指定的CPU核数确定了一个参数值:
spark.default.parallelism=指定的CPU核数(集群模式最小2)
2)对于Scala集合调用parallelize(集合,分区数)方法,
如果没有指定分区数,就使用spark.default.parallelism,
如果指定了就使用指定的分区数(不要指定大于spark.default.parallelism)
3)对于textFile(文件,分区数) defaultMinPartitions
如果没有指定分区数sc.defaultMinPartitions=min(defaultParallelism,2)
如果指定了就使用指定的分区数sc.defaultMinPartitions=指定的分区数
rdd 的分区数
对于本地文件:
rdd的分区数 = max(本地file的分片数, sc.defaultMinPartitions)
对于HDFS文件:
rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions)
所以如果分配的核数为多个,且从文件中读取数据创建RDD,即使hdfs文件只有1个切片,最后的Spark的RDD的partition数也有可能是2
2.3.5 不同转换算子的意义以及应用
1)map
对RDD中的每一个元素进行操作并返回操作的结果
//通过并行化生成rdd
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//对rdd1里的每一个元素
rdd1.map(_ * 2).collect
//collect方法表示收集,是action操作
2)filter
注意:函数中返回True的被留下,返回False的被过滤掉
val rdd2 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
val rdd3 = rdd2.filter(_ >= 10)
rdd3.collect
//10
3)flatmap
对RDD中的每一个元素进行先map再压扁,最后返回操作的结果
val rdd1 = sc.parallelize(Array(“a b c”, “d e f”, “h i j”))
//将rdd1里面的每一个元素先切分再压平
val rdd2 = rdd1.flatMap(_.split(’ '))
rdd2.collect
//Array[String] = Array(a, b, c, d, e, f, h, i, j)
4)sortBy
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
val rdd2 = rdd1.sortBy(x=>x,true)
// x=>x 表示按照元素本身进行排序,True表示升序
rdd2.collect
//1,2,3,…
val rdd2 = rdd1.sortBy(x=>x+"",true)
//x=>x+""表示按照x的字符串形式排序变成了字符串,结果为字典顺序
rdd2.collect
//1,10,2,3…
5)交集、并集、差集、笛卡尔积
注意类型要一致
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//union不会去重
val rdd3 = rdd1.union(rdd2)
rdd3.collect
//去重
rdd3.distinct.collect
//求交集
val rdd4 = rdd1.intersection(rdd2)
rdd4.collect
//求差集
val rdd5 = rdd1.subtract(rdd2)
rdd5.collect
//笛卡尔积
val rdd1 = sc.parallelize(List(“jack”, “tom”))//学生
val rdd2 = sc.parallelize(List(“java”, “python”, “scala”))//课程
val rdd3 = rdd1.cartesian(rdd2)//表示所有学生的所有选课情况
rdd3.collect
//Array[(String, String)] = Array((jack,java), (jack,python), (jack,scala), (tom,java), (tom,python), (tom,scala))
6)join
join(内连接)聚合具有相同key组成的value元组
val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 2), (“kitty”, 3)))
val rdd2 = sc.parallelize(List((“jerry”, 9), (“tom”, 8), (“shuke”, 7), (“tom”, 2)))
val rdd3 = rdd1.join(rdd2)
rdd3.collect
//Array[(String, (Int, Int))] = Array((tom,(1,8)), (tom,(1,2)), (jerry,(2,9)))
图解 1
val rdd4 = rdd1.leftOuterJoin(rdd2) //左外连接,左边的全留下,右边的满足条件的才留下
rdd4.collect
//Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(2))), (tom,(1,Some(8))), (jerry,(2,Some(9))), (kitty,(3,None)))
图解 2
val rdd5 = rdd1.rightOuterJoin(rdd2)
rdd5.collect
//Array[(String, (Option[Int], Int))] = Array((tom,(Some(1),2)), (tom,(Some(1),8)), (jerry,(Some(2),9)), (shuke,(None,7)))
val rdd6 = rdd1.union(rdd2)
rdd6.collect
//Array[(String, Int)] = Array((tom,1), (jerry,2), (kitty,3), (jerry,9), (tom,8), (shuke,7), (tom,2))
7)groupbykey
groupByKey()的功能是,对具有相同键的值进行分组。
比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),
采用groupByKey()后得到的结果是:(“spark”,(1,2))和(“hadoop”,(3,5))。
//按key进行分组
val rdd6 = sc.parallelize(Array((“tom”,1), (“jerry”,2), (“kitty”,3), (“jerry”,9), (“tom”,8), (“shuke”,7), (“tom”,2)))
val rdd7=rdd6.groupByKey
rdd7.collect
//Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 8, 2)), (jerry,CompactBuffer(2, 9)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)))
8)cogroup[了解]
cogroup是先RDD内部分组,在RDD之间分组
val rdd1 = sc.parallelize(List((“tom”, 1), (“tom”, 2), (“jerry”, 3), (“kitty”, 2)))
val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 1), (“shuke”, 2)))
val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect
// Array((tom,(CompactBuffer(1, 2),CompactBuffer(1))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())))
9)groupBy
根据指定的函数中的规则/key进行分组
val intRdd = sc.parallelize(List(1,2,3,4,5,6))
val result = intRdd.groupBy(x=>if(x%2 == 0)“even” else “odd”).collect
//Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6)), (odd,CompactBuffer(1, 3, 5)))
10)reduce
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val result = rdd1.reduce(_ + )
// 第一 上次一个运算的结果,第二个_ 这一次进来的元素
★面试题
reduceByKey是Transformation还是Action? --Transformation
reduce是Transformation还是Action? --Action
11)reducebykey
注意reducebykey是转换算子
reduceByKey(func)的功能是,使用func函数合并具有相同键的值。
比如,reduceByKey((a,b) => a+b),有四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)
对具有相同key的键值对进行合并后的结果就是:(“spark”,3)、(“hadoop”,8)。
可以看出,(a,b) => a+b这个Lamda表达式中,a和b都是指value,
比如,对于两个具有相同key的键值对(“spark”,1)、(“spark”,2),a就是1,b就是2。
val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 3), (“kitty”, 2), (“shuke”, 1)))
val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5)))
val rdd3 = rdd1.union(rdd2) //并集
rdd3.collect
//Array[(String, Int)] = Array((tom,1), (jerry,3), (kitty,2), (shuke,1), (jerry,2), (tom,3), (shuke,2), (kitty,5))
//按key进行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//Array[(String, Int)] = Array((tom,4), (jerry,5), (shuke,3), (kitty,7))
12)repartition
改变分区数
val rdd1 = sc.parallelize(1 to 10,3) //指定3个分区
//利用repartition改变rdd1分区数
//减少分区
rdd1.repartition(2).partitions.length //新生成的rdd分区数为2
rdd1.partitions.length //3 //注意:原来的rdd分区数不变
//增加分区
rdd1.repartition(4).partitions.length
//减少分区
rdd1.repartition(3).partitions.length
//利用coalesce改变rdd1分区数
//减少分区
rdd1.coalesce(2).partitions.size
rdd1.coalesce(4).partitions.size
★注意:
repartition可以增加和减少rdd中的分区数,
coalesce默认减少rdd分区数,增加rdd分区数不会生效。
不管增加还是减少分区数原rdd分区数不变,变的是新生成的rdd的分区数
★应用场景:
在把处理结果保存到hdfs上之前可以减少分区数(合并小文件)
sc.textFile(“hdfs://node01:8020/wordcount/input/words.txt”)
.flatMap(.split(" ")).map((,1)).reduceByKey(+)
.repartition(1)
//在保存到HDFS之前进行重分区为1,那么保存在HDFS上的结果文件只有1个
.saveAsTextFile(“hdfs://node01:8020/wordcount/output5”)
13)collect
val rdd1 = sc.parallelize(List(6,1,2,3,4,5), 2)
rdd1.collect
14)count
count统计集合中元素的个数
rdd1.count //6
求RDD中最外层集合里面的元素的个数
val rdd3 = sc.parallelize(List(List(“a b c”, “a b b”),List(“e f g”, “a f g”), List(“h i j”, “a a b”)))
rdd3.count //3
15)distinct
val rdd = sc.parallelize(Array(1,2,3,4,5,5,6,7,8,1,2,3,4), 3)
rdd.distinct.collect
16)top
//取出最大的前N个
val rdd1 = sc.parallelize(List(3,6,1,2,4,5))
rdd1.top(2)
17)take
//按照原来的顺序取前N个
rdd1.take(2) //3 6
//需求:取出最小的2个
rdd1.sortBy(x=>x,true).take(2)
18)first
//按照原来的顺序取前第一个
rdd1.first
19)keys、values
val rdd1 = sc.parallelize(List(“dog”, “tiger”, “lion”, “cat”, “panther”, “eagle”), 2)
val rdd2 = rdd1.map(x => (x.length, x))
rdd2.collect
//Array[(Int, String)] = Array((3,dog), (5,tiger), (4,lion), (3,cat), (7,panther), (5,eagle))
rdd2.keys.collect
//Array[Int] = Array(3, 5, 4, 3, 7, 5)
rdd2.values.collect
//Array[String] = Array(dog, tiger, lion, cat, panther, eagle)
20)mapValues
mapValues表示对RDD中的元素进行操作,Key不变,Value变为操作之后
val rdd1 = sc.parallelize(List((1,10),(2,20),(3,30)))
val rdd2 = rdd1.mapValues(_*2).collect //_表示每一个value ,key不变,将函数作用于value
//(1,20),(2,40),(3,60)
21)collectAsMap
转换成Map
val rdd = sc.parallelize(List((“a”, 1), (“b”, 2)))
rdd.collectAsMap
//scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)
面试题:foreach 和 foreachPartition
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
rdd1.foreach(x => println(x*100)) //x是每一个元素
rdd1.foreachPartition(x => println(x.reduce(_ + _))) //x是每个分区
注意:foreach和foreachPartition都是Action操作,但是以上代码在spark-shell中执行看不到输出结果,
原因是传给foreach和foreachPartition的计算函数是在各个分区执行的,即在集群中的各个Worker上执行的
应用场景:
比如在函数中要将RDD中的元素保存到数据库
foreach:会将函数作用到RDD中的每一条数据,那么有多少条数据,操作数据库连接的开启关闭就得执行多少次
foreachPartition:将函数作用到每一个分区,那么每一个分区执行一次数据库连接的开启关闭,有几个分区就会执行数据库连接开启关闭
import org.apache.spark.SparkConf, SparkContext
object Test
def main(args: Array[String]): Unit =
val config = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(config)
//设置日志输出级别
sc.setLogLevel("WARN")
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
//Applies a function f to all elements of this RDD.
//将函数f应用于此RDD的所有元素
rdd1.foreach(x => println(x*100))
//把函数传给各个分区,在分区内循环遍历该分区中的元素
//x每个元素,即一个一个的数字
println("==========================")
//Applies a function f to each partition of this RDD.
//将函数f应用于此RDD的每个分区
rdd1.foreachPartition(x => println(x.reduce(_ + _)))
//把各个分区传递给函数执行
//x是每个分区
面试题:map 和 mapPartitions
将每一个分区传递给函数
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
rdd1.mapPartitions(x=>x.map(y=>y*2)).collect
//x是每一个分区,y是分区中的元素
扩展:mapPartitionsWithIndex(同时获取分区号)
功能:取分区中对应的数据时,还可以将分区的编号取出来,这样就可以知道数据是属于哪个分区的
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
//该函数的功能是将对应分区中的数据取出来,并且带上分区编号
// 一个index 分区编号
// 一个iter分区内的数据
val func = (index: Int, iter: Iterator[Int]) =>
iter.map(x => “[partID:” + index + ", val: " + x + “]”)
rdd1.mapPartitionsWithIndex(func).collect
//Array[String] = Array(
[partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3],
[partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6],
[partID:2, val: 7], [partID:2, val: 8], [partID:2, val: 9]
)
扩展:aggregate
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
//0表示初始值
//第一个_+,表示区内聚合,第一个_表示历史值,第二个_表示当前值
//第二个+_,表示区间聚合,第一个_表示历史值,第二个_表示当前值
val result1: Int = rdd1.aggregate(0)( _ + _ , _ + _) //45 ==> 6 + 15 + 24 = 45
//10表示初始值,每个分区有初始值,区间聚合的时候也有初始值
val result2: Int = rdd1.aggregate(10)( _ + _ , _ + _) //85 ==> 10+ (10+6 + 10+15 + 10+24)=85
扩展:combineByKey
val rdd1 = sc.textFile(“hdfs://node01:8020/wordcount/input/words.txt”).flatMap(.split(" ")).map((, 1))
//Array((hello,1), (me,1), (hello,1), (you,1), (hello,1), (her,1))
//x => x,表示key不变
//(a: Int, b: Int) => a + b:表示区内聚合
//(m: Int, n: Int) => m + n:表示区间聚合
val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
//val rdd2 = rdd1.combineByKey(x => x, _ + _ , _ + _ )//注意这里简写错误,原则:能省则省,不能省则不要偷懒
rdd2.collect
//Array[(String, Int)] = Array((hello,3), (me,1), (you,1), (her,1))
val rddData1: RDD[(String, Float)] = sc.parallelize(
Array(
(“班级1”, 95f),
(“班级2”, 80f),
(“班级1”, 75f),
(“班级3”, 97f),
(“班级2”, 88f)),
2)
val rddData2 = rddData1.combineByKey(
grade => (grade, 1),
(gc: (Float, Int), grade) => (gc._1 + grade, gc._2 + 1),
(gc1: (Float, Int), gc2: (Float, Int)) => (gc1._1 + gc2._1, gc1._2 + gc2._2)
)
val rddData3 = rddData2.map(t => (t._1, t._2._1 / t._2._2))
rddData3.collect
扩展:aggregateByKey
val pairRDD = sc.parallelize(List( (“cat”,2), (“cat”, 5), (“mouse”, 4),(“cat”, 12), (“dog”, 12), (“mouse”, 2)), 2)
def func(index: Int, iter: Iterator[(String, Int)]) :以上是关于Spark 长文详解的主要内容,如果未能解决你的问题,请参考以下文章