Spark环境与RDD

Posted 草人王亿火

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark环境与RDD相关的知识,希望对你有一定的参考价值。

Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。

Spark内置模块

Spark Core:实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core中还包含了对弹性分布式数据集(Resilient Distributed DataSet,简称RDD)的API定义。

Spark SQL:是Spark用来操作结构化数据的程序包。通过Spark SQL,我们可以使用 SQL或者Apache Hive版本的HQL来查询数据。Spark SQL支持多种数据源,比如Hive表、Parquet以及JSON等。

Spark Streaming:是Spark提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API,并且与Spark Core中的 RDD API高度对应。

Spark MLlib:提供常见的机器学习功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据 导入等额外的支持功能。

Spark GraphX:主要用于图形并行计算和图挖掘系统的组件。

集群管理器:Spark设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。为了实现这样的要求,同时获得最大灵活性,Spark支持在各种集群管理器(Cluster Manager)上运行,包括Hadoop YARN、Apache Mesos,以及Spark自带的一个简易调度器,叫作独立调度器。

运行模式

Local模式:在本地部署单个Spark服务

Standalone模式:Spark自带的任务调度模式。

YARN模式:Spark使用Hadoop的YARN组件进行资源与任务调度。

Mesos模式:Spark使用Mesos平台进行资源与任务的调度。

安装地址

Local模式

Local模式就是运行在一台计算机上的模式,通常就是用于在本机上练手和测试。

上传并解压Spark安装包tar -zxvfspark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/mvspark-2.1.1-bin-hadoop2.7 spark-local
bin/spark-submit\--class org.apache.spark.examples.SparkPi\--master local[2] \./examples/jars/spark-examples_2.11-2.1.1.jar\10##########--class:表示要执行程序的主类;--masterlocal[2]local: 没有指定线程数,则所有计算都运行在一个线程当中,没有任何并行计算local[K]:指定使用K个Core来运行计算,比如local[2]就是运行2个Core来执行local[*]: 自动帮你按照CPU最多核来设置线程数。比如CPU有4核,Spark帮你自动设置4个线程计算。

spark-shell模式Spark context Web UI available at http://node1:4040

sc是SparkCore程序的入口;spark是SparkSQL程序入口;master = local[*]表示本地模式运行。

运行任务方式说明:

spark-submit,是将jar上传到集群,执行Spark任务;

spark-shell,相当于命令行工具,本身也是一个Application。

说明:本地模式下,默认的调度器为FIFO。

集群角色

Master和Worker

Spark环境与RDD

Driver和Executor

Spark环境与RDD

通用运行流程

总结:Master和Worker是Spark的守护进程,即Spark在特定模式下正常运行所必须的进程。Driver和Executor是临时程序,当有具体任务提交到Spark集群才会开启的程序。

Standalone模式

Standalone模式是Spark自带的资源调动引擎,构建一个由Master + Slave构成的Spark集群,Spark运行在集群中。

这个要和Hadoop中的Standalone区别开来。这里的Standalone是指只用Spark来搭建一个集群,不需要借助其他的框架。是相对于Yarn和Mesos来说的。

使用
bin/spark-submit\--class org.apache.spark.examples.SparkPi\--master spark://node1:7077 \--executor-memory 2G \--total-executor-cores 2 \./examples/jars/spark-examples_2.11-2.1.1.jar\10

--master spark://hadoop102:7077指定要连接的集群的master

配置Executor可用内存为2G,使用CPU核数为2个

http://IP:8080/,发现执行本次任务,默认采用三台服务器节点的总核数24核,每个节点内存1024M。

8080:master的webUI

4040:application的webUI的端口号

参数说明
 参数    解释    可选值举例  
 --class    Spark程序中包含主函数的类  
 --master    Spark程序运行的模式    本地模式:local[*]、spark://hadoop102:7077、  Yarn  
 --executor-memory  1G    指定每个executor可用内存为1G    符合集群内存配置即可,具体情况具体分析。  
 --total-executor-cores  2    指定所有executor使用的cpu核数为2个  
 application-jar    打包好的应用jar,包含依赖。这个URL在集群中全局可见。比如hdfs:// 共享存储系统,如果是file:// path,那么所有的节点的path都包含同样的jar  
 application-arguments    传给main()方法的参数  
配置历史服务

由于spark-shell停止掉后,node1:4040页面就看不到历史任务的运行情况,所以开发时都配置历史服务器记录任务运行情况。

1、修改spark-default.conf.template名称mv spark-defaults.conf.template spark-defaults.conf2、修改spark-default.conf文件,配置日志存储路径,并分发vi spark-defaults.confspark.eventLog.enabled truespark.eventLog.dir hdfs://node1:8020/directory xsync spark-defaults.conf#HDFS上的目录需要提前存在sbin/start-dfs.shhadoop fs -mkdir /directory3、修改spark-env.sh文件vi spark-env.sh
exportSPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://node1:8020/directory -Dspark.history.retainedApplications=30"# 参数1含义:WEBUI访问的端口号为18080# 参数2含义:指定历史服务器日志存储路径# 参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。4、分发配置文件xsync spark-env.sh5、启动历史服务sbin/start-history-server.sh6、再次执行任务bin/spark-submit \--classorg.apache.spark.examples.SparkPi \--masterspark://node1:7077 \--executor-memory1G \--total-executor-cores2 \./examples/jars/spark-examples_2.11-2.1.1.jar\107、查看Spark历史服务地址:node1:18080
HA

Spark环境与RDD

配置高可用

1、停止集群sbin/stop-all.shZookeeper正常安装并启动zk.sh start2、修改spark-env.sh文件添加如下配置vi spark-env.sh注释掉如下内容:#SPARK_MASTER_HOST=node1#SPARK_MASTER_PORT=7077添加上如下内容。配置由Zookeeper管理Master,在Zookeeper节点中自动创建/spark目录,用于管理:export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node1,node2,node3 -Dspark.deploy.zookeeper.dir=/spark"\3、分发配置文件xsyncspark-env.sh4、启动全部节点sbin/start-all.sh5、在hadoop103上单独启动master节点sbin/start-master.sh
运行流程

Spark有standalone-client和standalone-cluster两种模式,主要区别在于:Driver程序的运行节点。

1、客户端模式

bin/spark-submit\--class org.apache.spark.examples.SparkPi\--master spark://node1:7077,node2:7077 \--executor-memory 2G \--total-executor-cores 2 \--deploy-mode client \./examples/jars/spark-examples_2.11-2.1.1.jar\10

--deploy-mode client,表示Driver程序运行在本地客户端

2、集群模式模式

bin/spark-submit\--class org.apache.spark.examples.SparkPi\--master spark://node1:7077,node2:7077 \--executor-memory 2G \--total-executor-cores 2 \--deploy-mode cluster \./examples/jars/spark-examples_2.11-2.1.1.jar\10

--deploy-mode cluster,表示Driver程序运行在集群

Yarn模式

Spark客户端直接连接Yarn,不需要额外构建Spark集群。

安装使用
1、解压tar -zxvfspark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/mvspark-2.1.1-bin-hadoop2.7/ spark-yarn2、修改hadoop配置文件/opt/module/hadoop-2.7.2/etc/hadoop/yarn-site.xml,因为测试环境内存较少,防止执行过程进行被意外杀死vi yarn-site.xml<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true--><property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value></property><!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true--><property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value></property><!--Spark2中jersey版本是2.22,但是yarn中还需要依赖1.9,版本不兼容--><property> <name>yarn.timeline-service.enabled</name> <value>false</value></property> 3、分发配置文件xsync /opt/module/hadoop-2.7.2/etc/hadoop/yarn-site.xml4、修改/opt/module/spark/conf/spark-env.sh,添加YARN_CONF_DIR配置,保证后续运行任务的路径都变成集群路径mv spark-env.sh.template spark-env.shvi spark-env.shYARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop5、分发spark-yarnxsync spark-yarn6、启动HDFS以及YARN集群sbin/start-dfs.shsbin/start-yarn.sh7、testbin/spark-submit\--class org.apache.spark.examples.SparkPi\--master yarn \./examples/jars/spark-examples_2.11-2.1.1.jar\10参数:--master yarn,表示Yarn方式运行;8、node1:8088页面,点击History,查看历史页面
配置历史服务

需要针对Yarn模式,配置一下历史服务器。

1、修改spark-default.conf.template名称 mv spark-defaults.conf.template spark-defaults.conf2、修改spark-default.conf文件,配置日志存储路径,并分发vi spark-defaults.confspark.eventLog.enabled truespark.eventLog.dir hdfs://node1:9000/directoryxsync spark-defaults.conf3、修改spark-env.sh文件export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory -Dspark.history.retainedApplications=30"# 参数1含义:WEBUI访问的端口号为18080# 参数2含义:指定历史服务器日志存储路径# 参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。4、分发配置文件xsyncspark-env.sh
配置查看历史日志

为了从Yarn上关联到Spark历史服务器,需要配置关联路径

1、修改配置文件/opt/module/spark/conf/spark-defaults.conf添加如下内容:spark.yarn.historyServer.address=hadoop102:18080spark.history.ui.port=180802、同步spark-defaults.conf配置文件xsyncspark-defaults.conf3、重启Spark历史服务sbin/stop-history-server.sh sbin/start-history-server.sh 4、提交任务到Yarn执行bin/spark-submit\--class org.apache.spark.examples.SparkPi\--master yarn \./examples/jars/spark-examples_2.11-2.1.1.jar\105、Web页面查看日志:http://node1:8088/cluster
运行流程

Spark有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。

yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出。

yarn-cluster:Driver程序运行在由ResourceManager启动的APPMaster适用于生产环境。

1、客户端模式(默认)

bin/spark-submit\--class org.apache.spark.examples.SparkPi\--master yarn \--deploy-mode client \./examples/jars/spark-examples_2.11-2.1.1.jar\10

Spark环境与RDD

2、集群模式

bin/spark-submit\--class org.apache.spark.examples.SparkPi\--master yarn \--deploy-mode cluster \./examples/jars/spark-examples_2.11-2.1.1.jar\10

Spark环境与RDD

如果在 yarn 日志端无法查看到具体的日志, 则在yarn-site.xml中添加如下配置并启动Yarn历史服务器

<property> <name>yarn.log.server.url</name> <value>http://hadoop204:19888/jobhistory/logs</value></property>

注意:hadoop历史服务器也要启动

mr-jobhistory-daemon.sh start historyserver

Mesos模式

Spark客户端直接连接Mesos;不需要额外构建Spark集群。

几种模式对比

 模式  Spark安装机器数    需启动的进程    所属者  
 Local    1    无    Spark  
 Standalone    3    Master及Worker    Spark  
 Yarn    1    Yarn及HDFS    Hadoop  
端口号总结
  • Spark历史服务器端口号:18080         (类比于Hadoop历史服务器端口号:19888)

  • SparkMaster Web端口号:8080(类比于Hadoop的NameNode Web端口号:50070))

  • SparkMaster内部通信服务端口号:7077    (类比于Hadoop的9000)

  • Spark查看当前Spark-shell运行任务情况端口号:4040

  • HadoopYARN任务运行情况查看端口号:8088


RDD概述

什么是RDD

Spark环境与RDD

特性

1、A list of partitions多个分区,分区可以看成是数据集的基本组成单位对于RDD来说,每个分区都会被一个计算任务处理,并决定了并行计算的粒度。用户可以在创建RDD时指定RDD的分区数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core 的数目.每个分配的存储是由BlockManager实现的,每个分区都会被逻辑映射成BlockManager的一个Block,而这个Block会被一个Task负责计算。
2、A function for computing each split计算每个切片(分区)的函数.Spark 中RDD 的计算是以分片为单位的,每个 RDD 都会实现compute函数以达到这个目的
3、A list of dependencies on other RDDs与其他 RDD 之间的依赖关系RDD 的每次转换都会生成一个新的 RDD, 所以 RDD 之间会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据, 而不是对 RDD 的所有分区进行重新计算
4、Optionally, a Partitioner for key-value RDDs (e.g. to say thatthe RDD is hash-partitioned)对存储键值对的 RDD,还有一个可选的分区器只有对于 key-value的 RDD,才会有 Partitioner, 非key-value的 RDD 的 Partitioner的值是None;Partitiner 不但决定了 RDD 的本区数量, 也决定了 parent RDD Shuffle 输出时的分区数量
5、Optionally, a list of preferred locations to compute each split on(e.g. block locations for an HDFS file)存储每个切片优先(preferred location)位置的列表比如对于一个 HDFS 文件来说, 这个列表保存的就是每个 Partition 所在文件块的位置. 按照“移动数据不如移动计算”的理念, Spark 在进行任务调度的时候, 会尽可能地将计算任务分配到其所要处理数据块的存储位置.

RDD的创建

RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。

从集合中创建

val list: List[Int] = List(1,2,3,4)//根据集合创建RDD 方式一val rdd: RDD[Int] = sc.parallelize(list)
//方式二:makeRDD,底层调用的就是parallelize(seq, numSlices)val rdd: RDD[Int] = sc.makeRDD(list)

从外部存储系统的数据集创建

val rdd: RDD[String] = sc.textFile("D:\\dev\\input\\1.txt")//从HDFS服务器上读取数据,创建RDDval rdd: RDD[String] = sc.textFile("hdfs://node1:9000/input")

从其他RDD创建,主要是通过一个RDD运算完后,再产生新的RDD。

分区规则
RDD数据从集合中创建
//集合中4个数据---设置分区数为3---实际输出3个分区---分区中数据分布 0分区->1, 1分区->2, 3分区->3 4//val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),3)

分区数、划分条件

 若未设置参数,默认分区取决于分配给应用的CPU的核数 override def defaultParallelism(): Int = scheduler.conf.getInt("spark.default.parallelism", totalCores)
length:数据长度 numSlices:分区数def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { (0 until numSlices).iterator.map { i => val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) } }

RDD数据从文件中读取后创建
val rdd: RDD[String] = sc.textFile("D:\\dev\\input\\3.txt",5)rdd.saveAsTextFile("D:\\dev\\output")

分区数、划分条件

 /** * Default min number of partitions for Hadoop RDDs when not given by user * Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2. * The reasons for this are discussed in https://github.com/mesos/spark/pull/718 */ //没有指定分区时,默认规则如下 defaultParallelism是分配给应用的cores def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
//指定分区时* >1.在textFile方法中,第二个参数minPartitions,表示最小分区数* 注意:是最小,不是实际的分区个数* >2.在实际计算分区个数的时候,会根据文件的总大小和最小分区数进行相除运算* &如果余数为0* 那么最小分区数,就是最终实际的分区数* &如果余数不为0* 那么实际的分区数,要计算//totalSize:文件总大小 numSplits:分区数long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);if (isSplitable(fs, path)) { long blockSize = file.getBlockSize(); //切片规则,返回goalSize, blockSize较小值 //protected long computeSplitSize(long goalSize, long minSize,long blockSize) { // return Math.max(minSize, Math.min(goalSize, blockSize));} //计算切片 long splitSize = computeSplitSize(goalSize, minSize, blockSize);  long bytesRemaining = length; //SPLIT_SLOP=1.1,剩余>1.1就继续切 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { String[] splitHosts = getSplitHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, splitHosts)); bytesRemaining -= splitSize; }
if (bytesRemaining != 0) { String[] splitHosts = getSplitHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts)); } }

getSplits文件返回的是切片规划,真正读取是在compute方法中创建

 override def getPartitions: Array[Partition] = { val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = getInputFormat(jobConf) val inputSplits = inputFormat.getSplits(jobConf, minPartitions) //切片规划 FileInputFormat->getSplits //0 = {FileSplit@5103} "file:/D:/dev/input/test.txt:0+7" //1 = {FileSplit@5141} "file:/D:/dev/input/test.txt:7+7" //2 = {FileSplit@5181} "file:/D:/dev/input/test.txt:14+7" //3 = {FileSplit@5237} "file:/D:/dev/input/test.txt:21+1" val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } array } 

真正读取是在compute方法中创建LineRecordReader读取的

 public LineRecordReader(Configuration job, FileSplit split, byte[] recordDelimiter) throws IOException { this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input. LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); //切片开始位置 start = split.getStart(); //切片结束位置 end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); codec = compressionCodecs.getCodec(file); ...

以上是关于Spark环境与RDD的主要内容,如果未能解决你的问题,请参考以下文章

RDD 与Spark 生产代码的数据集

Spark——RDD算子

Spark RDD 的Transformation与Action的常用功能总结(Python版本)

Spark数据读取

RDD的概念与创建

Spark——DataFrame与RDD互操作方式