Spark:基础知识01
Posted Xiao Miao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark:基础知识01相关的知识,希望对你有一定的参考价值。
文章目录
Spark基础知识
一、Spark框架
1.Spark介绍
介绍: Spark 是一种快速、通用、可扩展的大数据分析引擎,2009 年诞生于加州大学伯克利分校AMPLab,2010 年开源, 2013年6月成为Apache孵化项目,2014年2月成为 Apache 顶级项目,用 Scala进行编写项目框架。
官方地址:http://spark.apache.org/
官方地址:https://databricks.com/spark/about
官方定义:轻量级、快速的、大数据和机器学习的、统一分析引擎
1.分析引擎
分析处理数据,类似MapReduce框架
2.大规模数据
海量数据,数据很多
3.统一的分析引擎
离线分析
实时计算
机器学习
图形计算
4.分布式并行计算
分而治之思想
Spark框架中,最核心要点:抽象,称为RDD,相当于集合,比如列表List,存储数据
2.Spark四大特点
- Spark具有运行速度快、易用性好、通用性强和随处运行等特点。
Spark处理数据与MapReduce处理数据相比,有如下两个不同点:
- 1.Spark处理数据时,可以将中间处理结果数据存储到内存中;
- 2.Spark Job调度以DAG方式,并且每个任务Task执行以线程(Thread)方式,并不是像MapReduce以进程(Process)方式执行。
MapReduce计算框架与Spark计算引擎不同点:
Spark程序无处不在运行【Runs Everywhere】
-
1、数据存储
Spark分析的数据在哪里?任何地方都是可以,最主要还是HDFS、Hive、HBase、Kafka等等
-
2、程序运行
Spark编程代码,在何处执行,分析数据?本地模式、集群模式【Hadoop YARN、Mesos、Standalone】、容器(K8s)
3.Spark框架模式
整个Spark 框架模块包含:Spark Core、 Spark SQL、 SparkStreaming、 Spark GraphX、Spark MLlib,而后四项的能力都是建立在核心引擎之上 。
- 最核心,最基础SparkCore
- 四大公共模块SparkSQL、SparkStreaming、GraphX、MLlib
- 高级分析模块PySpark、SparkR
- 新的模块Structured Streaming
4.Spark运行模式
Spark 框架编写的应用程序可以运行在本地模式(Local Mode)、集群模式(Cluster Mode)和云服务(Cloud),方便开发测试和生产部署。
本地模式:Local Mode
在当前操作系统,启动JVM 进程,以线程方式运行Task任务
当Spark程序运行在本地模式时,可以设置运行几个Task(也就是几个线程):–master local[K]
- K含义:表示并行Task数目;表示线程数目;表示CPU Cores核数(在Spark程序运行时,1个Task任务,需要1Core CPU)
集群模式:Cluster Mode
Spark程序在实际项目中,运行最多集群模式:Hadoop YARN
二、本地模式
1.环境配置
- 将编译完成spark安装包【spark-2.4.5-bin-cdh5.16.2-2.11.tgz】解压至/export/server目录
#1.解压安装包
tar -zxf /export/software/spark-2.4.5-bin-cdh5.16.2-2.11.tgz -C /export/server/
#2.创建软连接
ln -s /export/server/spark-2.4.5-bin-cdh5.16.2-2.11 /export/server/spark
- 修改配置,Spark本地模式安装
#1.切换目录
cd /export/server/spark/conf
#2.修改配置文件名称
mv spark-env.sh.template spark-env.sh
#3.修改配置文件
vim spark-env.sh
#4.添加以下内容
JAVA_HOME=/export/server/jdk
SCALA_HOME=/export/server/scala
HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
- 启动HDFS集群,从HDFS上读取数据文件
#1.启动namenode
hadoop-daemon.sh start namenode
#2.启动datanode
hadoop-daemon.sh start datanode
2.运行spark-shell
本地模式运行Spark框架提供交互式命令行:spark-shell,其中本地模式LocalMode含义为:启动一个JVM Process进程,执行任务Task,使用方式如下:
--master local | local[*] | local[K] 建议 K >= 2 正整数
其中K表示启动线程数目(或CPU Core核数)
示意图如下:
本地模式启动spark-shell:
#1.进入Spark安装目录
cd /export/server/spark
#2.启动spark-shell
bin/spark-shell --master local[2]
1、Spark context Web UI available at http://node1:4040
每个Spark应用运行,提供WEB UI监控页面,默认端口号:4040
2、Spark context available as 'sc' (master = local[2], app id = local-1623470878682).
创建SparkContext类对象,名称为:sc
Spark应用程序入口,读取数据和调度Job执行
3、Spark session available as 'spark'.
Spark2.0开始,提供新的程序入口:SparkSession,创建对象,名称为:spark
测试:将$SPARK_HOME/README.md文件上传到HDFS目录/datas,使用SparkContext读取
#1.上传HDFS文件
hdfs dfs -mkdir -p /datas/
hdfs dfs -put /export/server/spark/README.md /datas
#2.读取文件
val datasRDD = sc.textFile("/datas/README.md")
#3.条目数
datasRDD.count
#4.获取第一条数据
datasRDD.first
3.Spark WordCount
大数据框架经典案例:词频统计WordCount,从文件读取数据,统计单词个数。
例子:
val list = List("spark hadoop spark", "spark spark hive hadoop")
val wordList = list.flatMap(line => line.split("\\\\s+"))
val tupleList = wordList.map(word => (word, 1))
val tupleMap = tupleList.groupBy(tuple => tuple._1)
val wc = tupleMap.map{case (word, list) =>
val count = list.map(tuple => tuple._2).sum
word -> count
}
使用Spark编程实现,分为三个步骤:
- 第一步、从HDFS读取文件数据,sc.textFile方法,将数据封装到RDD中
- 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
- 第三步、将最终处理结果RDD保存到HDFS或打印控制台
#1.创建文件
vim wordcount.data
#2.添加内容如下
spark spark hive hive spark hive
hadoop sprk spark
#3.上传HDFS
hdfs dfs -put wordcount.data /datas
Scala编程实现如下:
#1.读取HDFS文本数据,封装到RDD集合中,文本中每条数据就是集合中每条数据
val inputRDD = sc.textFile("/datas/wordcount.data")
#2.将集合中每条数据按照分隔符分割,使用正则:https://www.runoob.com/regexp/regexp-syntax.html
val wordsRDD = inputRDD.flatMap(line => line.split("\\\\s+"))
#3.转换为二元组,表示每个单词出现一次
val tuplesRDD = wordsRDD.map(word => (word, 1))
#4.按照Key分组,对Value进行聚合操作, scala中二元组就是Java中Key/Value对,reduceByKey:先分组,再聚合
val wordcountsRDD = tuplesRDD.reduceByKey((tmp, item) => tmp + item)
#5.查看结果
wordcountsRDD.take(5)
#6.保存结果数据到HDFs中
wordcountsRDD.saveAsTextFile("/datas/spark-wc")
#7.查结果数据
hdfs dfs -text /datas/spark-wc/par*
4.WEB UI监控
每个Spark Application应用运行时,启动WEB UI监控页面,默认端口号为4040,使用浏览器打开页面,如下:
点击Job 3,进入到此Job调度界面,通过DAG图展示:
5.运行圆周率PI
Spark框架自带的案例Example中涵盖圆周率PI计算程序,可以使用
$SPARK_HOME/bin/spark-submit 提交应用执行,运行在本地模式。
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \\
--master local[2] \\
--class org.apache.spark.examples.SparkPi \\
${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar \\
10
三、Spark Standalone集群
1.Standalone 架构
Standalone模式是Spark自带的一种集群模式,Standalone模式是真实地在多个机器之间搭建Spark集群的环境。
Standalone集群使用了分布式计算中的master-slave模型,master是集群中含有Master进程的节点,slave是集群中的Worker节点含有Executor进程。
Spark Standalone集群,类似Hadoop YARN,管理集群资源和调度资源:
- 主节点Master:类似ResourceManager
- 管理整个集群资源,接收提交应用,分配资源给每个应用,运行Task任务
- 从节点Workers:类似NodeManager
- 管理每个机器的资源,分配对应的资源来运行Task;
- 每个从节点分配资源信息给Worker管理,资源信息包含内存Memory和CPU Cores核数
- 历史服务器HistoryServer:类似MRJobHistoryServer
- Spark Application运行完成以后,保存事件日志数据至HDFS,启动HistoryServer可以查看应用运行相关信息
2.解压、配置环境变量
解压Spark编译安装包至【/export/server/】目录下,进入【conf】目录,配置环境变量
#1.解压软件包
tar -zxf /export/software/spark-2.4.5-bin-cdh5.16.2-2.11.tgz -C /export/server/
#2.创建软连接,方便后期升级
ln -s /export/server/spark-2.4.5-bin-cdh5.16.2-2.11 /export/server/spark
#3.进入配置目录
cd /export/server/spark/conf
#4.修改配置文件名称
mv spark-env.sh.template spark-env.sh
vim spark-env.sh
#5.添加内容如下:
JAVA_HOME=/export/server/jdk
SCALA_HOME=/export/server/scala
HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
将【$SPARK_HOME/conf/slaves.template】名称命名为【slaves】,填写从节点名称
1.进入配置目录
cd /export/server/spark/conf
2.修改配置文件名称
mv slaves.template slaves
vim slaves
3.内容如下:
node1
node2
node3
配置Master、Workers、HistoryServer
在配置文件【$SPARK_HOME/conf/spark-env.sh】添加如下内容:
SPARK_MASTER_HOST=node1
SPARK_MASTER_PORT=7077
SPARK_MASTER_WEBUI_PORT=8080
SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=1g
SPARK_WORKER_PORT=7078
SPARK_WORKER_WEBUI_PORT=8081
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1:8020/spark/eventLogs/
-Dspark.history.fs.cleaner.enabled=true"
创建EventLogs存储目录
启动HDFS服务,创建应用运行事件日志目录,命令如下:
hadoop-daemon.sh start namenode
hadoop-daemons.sh start datanode
hdfs dfs -mkdir -p /spark/eventLogs/
配置Spark应用保存EventLogs
将【$SPARK_HOME/conf/spark-defaults.conf.template】名称命名为【spark-defaults.conf】,填写如下内容:
#1.进入配置目录
cd /export/server/spark/conf
#2.修改配置文件名称
mv spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
#3.添加内容如下:
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node1:8020/spark/eventLogs/
spark.eventLog.compress true
设置日志级别
将【$SPARK_HOME/conf/log4j.properties.template】名称命名为【log4j.properties】,修改级别为警告WARN。
#1.进入目录
cd /export/server/spark/conf
#2.修改日志属性配置文件名称
mv log4j.properties.template log4j.properties
#3.改变日志级别
vim log4j.properties
分发到集群所有机器
将配置好的将 Spark 安装包分发给集群中其它机器,命令如下:
cd /export/server/
scp -r spark-2.4.5-bin-cdh5.16.2-2.11 root@node2:$PWD
scp -r spark-2.4.5-bin-cdh5.16.2-2.11 root@ node3:$PWD
## 远程连接到node2和node3机器,创建软连接
ln -s /export/server/spark-2.4.5-bin-cdh5.16.2-2.11 /export/server/spark
启动服务进程
在Master节点node1上启动,进入$SPARK_HOME,必须配置主节点到所有从节点的SSH无密钥登录,集群各个机器时间同步。
- 主节点Master启动命令
/export/server/spark/sbin/start-master.sh
- WEB UI页面地址:http://node1:8080
- 从节点Workers启动命令:
/export/server/spark/sbin/start-slaves.sh
历史服务器HistoryServer:
/export/server/spark/sbin/start-history-server.sh
- WEB UI页面地址:http://node1:18080
提交运行圆周率
将上述运行在Local Mode的圆周率PI程序,运行在Standalone集群上,修改【–master】地址
为Standalone集群地址:spark://node1:7077,具体命令如下:
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \\
--master spark://node1:7077 \\
--class org.apache.spark.examples.SparkPi \\
${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar \\
10
3.Spark 应用架构
从图中可以看到Spark Application运行到集群上时,由两部分组成:Driver Program和Executors。
1.Driver Program
相当于AppMaster,整个应用管理者,负责应用中所有Job的调度执行;
运行JVM Process,运行程序的MAIN函数,必须创建SparkContext上下文对象;
一个SparkApplication仅有一个;
2.Executors
相当于一个线程池,运行JVM Process,其中有很多线程,每个线程运行一个Task任务,
一个Task运行需要1 Core CPU,所有可以认为Executor中线程数就等于CPU Core核数;
一个Spark Application可以有多个,可以设置个数和资源信息
4.WEB UI 监控
Spark 提供了多个监控界面,当运行Spark任务后可以直接在网页对各种信息进行监控查看。运行spark-shell交互式命令在Standalone集群上,命令如下:
/export/server/spark/bin/spark-shell --master spark://node1:7077
scala> val dataRDD=sc.textFile("/datas/wc.data")
scala> val wordcountsRDD=dataRDD.flatMap(line=>line.split("\\\\s+")).map(word=>(word,1)).reduceByKey((tmp,item)=>tmp+item)
scala> wordcountsRDD.take(5)
可以发现在一个Spark Application中,包含多个Job,每个Job有多个Stage组成,每个Job执行
按照DAG图进行的。
其中每个Stage中包含多个Task任务,每个Task以线程Thread方式执行,需要1Core CPU
Spark Application程序运行时三个核心概念:Job、Stage、Task,说明如下:
-
Task:被分配到各个 Executor 的单位工作内容,它是 Spark 中的最小执行单位,一般来说有多少个 Paritition(物理层面的概念,即分支可以理解为将数据划分成不同部分并行处理),就会有多少个 Task,每个 Task 只会处理单一分支上的数据。
-
Job:由多个 Task 的并行计算部分,一般 Spark 中的 action 操作(如 save、collect,后面
进一步说明),会生成一个 Job。 -
Stage:Job 的组成单位,一个 Job 会切分成多个 Stage,Stage 彼此之间相互依赖顺序执行,
而每个 Stage 是多个 Task 的集合,类似 map 和 reduce stage。
5.Standalone HA
Spark Standalone集群是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群
一样,存在着Master单点故障(SPOF)的问题
高可用HA
如何解决这个单点故障的问题,Spark提供了两种方案:
-
基于文件系统的单点恢复(Single-Node Recovery with Local File System);
-
基于Zookeeper的Standby Masters(Standby Masters with ZooKeeper);
ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。由于集群的信息,包括Worker, Driver和Application的信息都已经持久化到文件系统,因此在切换的过程中只会影响新Job的提交,对于正在进行的Job没有任何的影响。加入ZooKeeper的集群整体架构如下图所示:
基于Zookeeper实现HA -
停止Standalone集群
#在node1上执行命令
/export/server/spark/sbin/stop-master.sh
/export/server/spark/sbin/stop-slaves.sh
- 增加Zookeeper配置
对Spark配置文件【$SPARK_HOME/conf/spark-env.sh】文件如下修改:
SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=node1:2181,node22181,node3:2181
-Dspark.deploy.zookeeper.dir=/spark-ha"
参数含义说明:
spark.deploy.recoveryMode:恢复模式
spark.deploy.zookeeper.url:ZooKeeper的Server地址
spark.deploy.zookeeper.dir:保存集群元数据信息的文件、目录。包括Worker、Driver、Application信息
注释或删除MASTER_HOST内容:
#SPARK_MASTER_HOST=node1
将spark-env.sh分发集群
cd /export/server/spark/conf
scp -r spark-env.sh root@node2:$PWD
scp -r spark-env.sh root@node3:$PWD
启动集群服务
先启动Zookeeper集群,再分别启动2个Master服务,最后启动Worker服务
#1.启动ZOOKEEPER服务
zookeeper-daemons.sh start
#2.在node1和node2分别启动Master服务
/export/server/spark/sbin/start-master.sh
#3.查看哪个Master为Active,就在哪个Master机器上启动Workers服务
/export/server/spark/sbin/start-slaves.sh
测试运行
Standalone HA集群运行应用时,指定ClusterManager参数属性为:
--master spark://host1:port1,host2:port2
提交圆周率PI运行集群,命令如下:
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \\
--master spark://node1:7077,node2:7077 \\
--class org.apache.spark.examples.SparkPi \\
${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar \\
100
以上是关于Spark:基础知识01的主要内容,如果未能解决你的问题,请参考以下文章
spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段