Spark环境与RDD
Posted 草人王亿火
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark环境与RDD相关的知识,希望对你有一定的参考价值。
Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
Spark内置模块
Spark Core:实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core中还包含了对弹性分布式数据集(Resilient Distributed DataSet,简称RDD)的API定义。
Spark SQL:是Spark用来操作结构化数据的程序包。通过Spark SQL,我们可以使用 SQL或者Apache Hive版本的HQL来查询数据。Spark SQL支持多种数据源,比如Hive表、Parquet以及JSON等。
Spark Streaming:是Spark提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API,并且与Spark Core中的 RDD API高度对应。
Spark MLlib:提供常见的机器学习功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据 导入等额外的支持功能。
Spark GraphX:主要用于图形并行计算和图挖掘系统的组件。
集群管理器:Spark设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。为了实现这样的要求,同时获得最大灵活性,Spark支持在各种集群管理器(Cluster Manager)上运行,包括Hadoop YARN、Apache Mesos,以及Spark自带的一个简易调度器,叫作独立调度器。
运行模式
Local模式:在本地部署单个Spark服务
Standalone模式:Spark自带的任务调度模式。
YARN模式:Spark使用Hadoop的YARN组件进行资源与任务调度。
Mesos模式:Spark使用Mesos平台进行资源与任务的调度。
安装地址
Local模式
Local模式就是运行在一台计算机上的模式,通常就是用于在本机上练手和测试。
上传并解压Spark安装包
tar -zxvfspark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/
mvspark-2.1.1-bin-hadoop2.7 spark-local
bin/spark-submit\
--class org.apache.spark.examples.SparkPi\
--master local[2] \
./examples/jars/spark-examples_2.11-2.1.1.jar\
10
##########
--class:表示要执行程序的主类;
--masterlocal[2]
local: 没有指定线程数,则所有计算都运行在一个线程当中,没有任何并行计算
local[K]:指定使用K个Core来运行计算,比如local[2]就是运行2个Core来执行
local[*]: 自动帮你按照CPU最多核来设置线程数。比如CPU有4核,Spark帮你自动设置4个线程计算。
spark-shell模式Spark context Web UI available at http://node1:4040
sc是SparkCore程序的入口;spark是SparkSQL程序入口;master = local[*]表示本地模式运行。
运行任务方式说明:
spark-submit,是将jar上传到集群,执行Spark任务;
spark-shell,相当于命令行工具,本身也是一个Application。
说明:本地模式下,默认的调度器为FIFO。
集群角色
Master和Worker
Driver和Executor
通用运行流程
总结:Master和Worker是Spark的守护进程,即Spark在特定模式下正常运行所必须的进程。Driver和Executor是临时程序,当有具体任务提交到Spark集群才会开启的程序。
Standalone模式
Standalone模式是Spark自带的资源调动引擎,构建一个由Master + Slave构成的Spark集群,Spark运行在集群中。
这个要和Hadoop中的Standalone区别开来。这里的Standalone是指只用Spark来搭建一个集群,不需要借助其他的框架。是相对于Yarn和Mesos来说的。
使用
bin/spark-submit\
--class org.apache.spark.examples.SparkPi\
--master spark://node1:7077 \
--executor-memory 2G \
--total-executor-cores 2 \
./examples/jars/spark-examples_2.11-2.1.1.jar\
10
--master spark://hadoop102:7077指定要连接的集群的master
配置Executor可用内存为2G,使用CPU核数为2个
http://IP:8080/,发现执行本次任务,默认采用三台服务器节点的总核数24核,每个节点内存1024M。
8080:master的webUI
4040:application的webUI的端口号
参数说明
参数 | 解释 | 可选值举例 |
---|---|---|
--class | Spark程序中包含主函数的类 | |
--master | Spark程序运行的模式 | 本地模式:local[*]、spark://hadoop102:7077、 Yarn |
--executor-memory 1G | 指定每个executor可用内存为1G | 符合集群内存配置即可,具体情况具体分析。 |
--total-executor-cores 2 | 指定所有executor使用的cpu核数为2个 | |
application-jar | 打包好的应用jar,包含依赖。这个URL在集群中全局可见。比如hdfs:// 共享存储系统,如果是file:// path,那么所有的节点的path都包含同样的jar | |
application-arguments | 传给main()方法的参数 |
配置历史服务
由于spark-shell停止掉后,node1:4040页面就看不到历史任务的运行情况,所以开发时都配置历史服务器记录任务运行情况。
1、修改spark-default.conf.template名称
mv spark-defaults.conf.template spark-defaults.conf
2、修改spark-default.conf文件,配置日志存储路径,并分发
vi spark-defaults.conf
true
hdfs://node1:8020/directory
xsync spark-defaults.conf
#HDFS上的目录需要提前存在
sbin/start-dfs.sh
hadoop fs -mkdir /directory
3、修改spark-env.sh文件
vi spark-env.sh
exportSPARK_HISTORY_OPTS="
18080 =
hdfs://node1:8020/directory =
30" =
# 参数1含义:WEBUI访问的端口号为18080
# 参数2含义:指定历史服务器日志存储路径
# 参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
4、分发配置文件
xsync spark-env.sh
5、启动历史服务
sbin/start-history-server.sh
6、再次执行任务
\
--classorg.apache.spark.examples.SparkPi \
--masterspark://node1:7077 \
--executor-memory1G \
--total-executor-cores2 \
./examples/jars/spark-examples_2.11-2.1.1.jar\
10
18080 :
HA
配置高可用
1、停止集群
sbin/stop-all.sh
Zookeeper正常安装并启动
zk.sh start
2、修改spark-env.sh文件添加如下配置
vi spark-env.sh
注释掉如下内容:
#SPARK_MASTER_HOST=node1
#SPARK_MASTER_PORT=7077
添加上如下内容。配置由Zookeeper管理Master,在Zookeeper节点中自动创建/spark目录,用于管理:
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=node1,node2,node3
-Dspark.deploy.zookeeper.dir=/spark"\
3、分发配置文件
xsyncspark-env.sh
4、启动全部节点
sbin/start-all.sh
5、在hadoop103上单独启动master节点
sbin/start-master.sh
运行流程
Spark有standalone-client和standalone-cluster两种模式,主要区别在于:Driver程序的运行节点。
1、客户端模式
bin/spark-submit\
--class org.apache.spark.examples.SparkPi\
--master spark://node1:7077,node2:7077 \
--executor-memory 2G \
--total-executor-cores 2 \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar\
10
--deploy-mode client,表示Driver程序运行在本地客户端
2、集群模式模式
bin/spark-submit\
--class org.apache.spark.examples.SparkPi\
--master spark://node1:7077,node2:7077 \
--executor-memory 2G \
--total-executor-cores 2 \
--deploy-mode cluster \
./examples/jars/spark-examples_2.11-2.1.1.jar\
10
--deploy-mode cluster,表示Driver程序运行在集群
Yarn模式
Spark客户端直接连接Yarn,不需要额外构建Spark集群。
安装使用
1、解压
tar -zxvfspark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/
mvspark-2.1.1-bin-hadoop2.7/ spark-yarn
2、修改hadoop配置文件/opt/module/hadoop-2.7.2/etc/hadoop/yarn-site.xml,因为测试环境内存较少,防止执行过程进行被意外杀死
vi yarn-site.xml
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true-->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true-->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
<!--Spark2中jersey版本是2.22,但是yarn中还需要依赖1.9,版本不兼容-->
<property>
<name>yarn.timeline-service.enabled</name>
<value>false</value>
</property>
3、分发配置文件
xsync /opt/module/hadoop-2.7.2/etc/hadoop/yarn-site.xml
4、修改/opt/module/spark/conf/spark-env.sh,添加YARN_CONF_DIR配置,保证后续运行任务的路径都变成集群路径
mv spark-env.sh.template spark-env.sh
vi spark-env.sh
YARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop
5、分发spark-yarn
xsync spark-yarn
6、启动HDFS以及YARN集群
sbin/start-dfs.sh
sbin/start-yarn.sh
7、test
bin/spark-submit\
--class org.apache.spark.examples.SparkPi\
--master yarn \
./examples/jars/spark-examples_2.11-2.1.1.jar\
10
参数:--master yarn,表示Yarn方式运行;
8、node1:8088页面,点击History,查看历史页面
配置历史服务
需要针对Yarn模式,配置一下历史服务器。
1、修改spark-default.conf.template名称
mv spark-defaults.conf.template spark-defaults.conf
2、修改spark-default.conf文件,配置日志存储路径,并分发
vi spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node1:9000/directory
xsync spark-defaults.conf
3、修改spark-env.sh文件
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory
-Dspark.history.retainedApplications=30"
# 参数1含义:WEBUI访问的端口号为18080
# 参数2含义:指定历史服务器日志存储路径
# 参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
4、分发配置文件
xsyncspark-env.sh
配置查看历史日志
为了从Yarn上关联到Spark历史服务器,需要配置关联路径
1、修改配置文件/opt/module/spark/conf/spark-defaults.conf
添加如下内容:
spark.yarn.historyServer.address=hadoop102:18080
spark.history.ui.port=18080
2、同步spark-defaults.conf配置文件
xsyncspark-defaults.conf
3、重启Spark历史服务
sbin/stop-history-server.sh
sbin/start-history-server.sh
4、提交任务到Yarn执行
bin/spark-submit\
--class org.apache.spark.examples.SparkPi\
--master yarn \
./examples/jars/spark-examples_2.11-2.1.1.jar\
10
5、Web页面查看日志:http://node1:8088/cluster
运行流程
Spark有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。
yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出。
yarn-cluster:Driver程序运行在由ResourceManager启动的APPMaster适用于生产环境。
1、客户端模式(默认)
bin/spark-submit\
--class org.apache.spark.examples.SparkPi\
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar\
10
2、集群模式
bin/spark-submit\
--class org.apache.spark.examples.SparkPi\
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.11-2.1.1.jar\
10
如果在 yarn 日志端无法查看到具体的日志, 则在yarn-site.xml中添加如下配置并启动Yarn历史服务器
<property>
<name>yarn.log.server.url</name>
<value>http://hadoop204:19888/jobhistory/logs</value>
</property>
注意:hadoop历史服务器也要启动
mr-jobhistory-daemon.sh start historyserver
Mesos模式
Spark客户端直接连接Mesos;不需要额外构建Spark集群。
几种模式对比
模式 | Spark安装机器数 | 需启动的进程 | 所属者 |
---|---|---|---|
Local | 1 | 无 | Spark |
Standalone | 3 | Master及Worker | Spark |
Yarn | 1 | Yarn及HDFS | Hadoop |
端口号总结
Spark历史服务器端口号:18080 (类比于Hadoop历史服务器端口号:19888)
SparkMaster Web端口号:8080(类比于Hadoop的NameNode Web端口号:50070))
SparkMaster内部通信服务端口号:7077 (类比于Hadoop的9000)
Spark查看当前Spark-shell运行任务情况端口号:4040
HadoopYARN任务运行情况查看端口号:8088
RDD概述
什么是RDD
特性
1、A list of partitions
多个分区,分区可以看成是数据集的基本组成单位
对于RDD来说,每个分区都会被一个计算任务处理,并决定了并行计算的粒度。
用户可以在创建RDD时指定RDD的分区数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core 的数目.
每个分配的存储是由BlockManager实现的,每个分区都会被逻辑映射成BlockManager的一个Block,而这个Block会被一个Task负责计算。
2、A function for computing each split
计算每个切片(分区)的函数.
Spark 中RDD 的计算是以分片为单位的,每个 RDD 都会实现compute函数以达到这个目的
3、A list of dependencies on other RDDs
与其他 RDD 之间的依赖关系
RDD 的每次转换都会生成一个新的 RDD, 所以 RDD 之间会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据, 而不是对 RDD 的所有分区进行重新计算
4、Optionally, a Partitioner for key-value RDDs (e.g. to say thatthe RDD is hash-partitioned)
对存储键值对的 RDD,还有一个可选的分区器
只有对于 key-value的 RDD,才会有 Partitioner, 非key-value的 RDD 的 Partitioner的值是None;
Partitiner 不但决定了 RDD 的本区数量, 也决定了 parent RDD Shuffle 输出时的分区数量
5、Optionally, a list of preferred locations to compute each split on(e.g. block locations for an HDFS file)
存储每个切片优先(preferred location)位置的列表
比如对于一个 HDFS 文件来说, 这个列表保存的就是每个 Partition 所在文件块的位置. 按照“移动数据不如移动计算”的理念, Spark 在进行任务调度的时候, 会尽可能地将计算任务分配到其所要处理数据块的存储位置.
RDD的创建
RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。
从集合中创建
val list: List[Int] = List(1,2,3,4)
//根据集合创建RDD 方式一
val rdd: RDD[Int] = sc.parallelize(list)
//方式二:makeRDD,底层调用的就是parallelize(seq, numSlices)
val rdd: RDD[Int] = sc.makeRDD(list)
从外部存储系统的数据集创建
val rdd: RDD[String] = sc.textFile("D:\\dev\\input\\1.txt")
//从HDFS服务器上读取数据,创建RDD
val rdd: RDD[String] = sc.textFile("hdfs://node1:9000/input")
从其他RDD创建,主要是通过一个RDD运算完后,再产生新的RDD。
分区规则
RDD数据从集合中创建
//集合中4个数据---设置分区数为3---实际输出3个分区---分区中数据分布 0分区->1, 1分区->2, 3分区->3 4
//val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),3)
分区数、划分条件
若未设置参数,默认分区取决于分配给应用的CPU的核数
override def defaultParallelism(): Int =
scheduler.conf.getInt("spark.default.parallelism", totalCores)
length:数据长度 numSlices:分区数
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
RDD数据从文件中读取后创建
val rdd: RDD[String] = sc.textFile("D:\\dev\\input\\3.txt",5)
rdd.saveAsTextFile("D:\\dev\\output")
分区数、划分条件
/**
* Default min number of partitions for Hadoop RDDs when not given by user
* Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
* The reasons for this are discussed in https://github.com/mesos/spark/pull/718
*/
//没有指定分区时,默认规则如下 defaultParallelism是分配给应用的cores
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
//指定分区时
* >1.在textFile方法中,第二个参数minPartitions,表示最小分区数
* 注意:是最小,不是实际的分区个数
* >2.在实际计算分区个数的时候,会根据文件的总大小和最小分区数进行相除运算
* &如果余数为0
* 那么最小分区数,就是最终实际的分区数
* &如果余数不为0
* 那么实际的分区数,要计算
//totalSize:文件总大小 numSplits:分区数
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
//切片规则,返回goalSize, blockSize较小值
//protected long computeSplitSize(long goalSize, long minSize,long blockSize) {
// return Math.max(minSize, Math.min(goalSize, blockSize));}
//计算切片
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining = length;
//SPLIT_SLOP=1.1,剩余>1.1就继续切
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[] splitHosts = getSplitHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
String[] splitHosts = getSplitHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts));
}
}
getSplits文件返回的是切片规划,真正读取是在compute方法中创建
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = getInputFormat(jobConf)
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
//切片规划 FileInputFormat->getSplits
//0 = {FileSplit@5103} "file:/D:/dev/input/test.txt:0+7"
//1 = {FileSplit@5141} "file:/D:/dev/input/test.txt:7+7"
//2 = {FileSplit@5181} "file:/D:/dev/input/test.txt:14+7"
//3 = {FileSplit@5237} "file:/D:/dev/input/test.txt:21+1"
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
}
真正读取是在compute方法中创建LineRecordReader读取的
public LineRecordReader(Configuration job, FileSplit split,
byte[] recordDelimiter) throws IOException {
this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
//切片开始位置
start = split.getStart();
//切片结束位置
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
codec = compressionCodecs.getCodec(file);
...
以上是关于Spark环境与RDD的主要内容,如果未能解决你的问题,请参考以下文章