一Spark概述

Posted 大数据开发笔记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一Spark概述相关的知识,希望对你有一定的参考价值。

一、 Spark 是什么

  1. 定义

    Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。

  2. 官方网址:

    http://spark.apache.org

    https://databricks.com/spark/about

  3. Spark 四大特点

    1、速度快(比mapreduce在内存中快100倍,在磁盘中快10倍)

    spark中的job中间结果可以不落地,可以存放在内存中。mapreduce中map和reduce任务都是以进程的方式运行着,而spark中的job是以线程方式运行在进程中。

    2、易用性(可以通过java/scala/python/R开发spark应用程序)

    3、通用性(可以使用spark sql/spark streaming/mlib/Graphx)

    4、兼容性(spark程序可以运行在standalone/yarn/mesos)

  4. Spark 框架模块

    整个Spark 框架模块包含:Spark Coke、 Spark SQL、 Spark Streaming、 Spark GraphX、

    Spark MLlib,而后四项的能力都是建立在核心引擎之上 。

    image-20201206162614667
二、 Spark 运行模式

Spark 框架编写的应用程序可以运行在本地模式(Local Mode)、集群模式(Cluster Mode)

和云服务(Cloud),方便开发测试和生产部署。

  1. 第一、本地模式:Local Mode

    将Spark 应用程序中任务Task运行在一个本地JVM Process进程中,通常开发测试使用。

    一、Spark概述
    image-20201206163228645
  2. 集群模式:Cluster Mode

    将Spark应用程序运行在集群上,比如Hadoop YARN集群,Spark 自身集群Standalone及Apache Mesos集群,网址:http://spark.apache.org/docs/2.4.3/

    Hadoop YARN集群模式(生产环境使用):运行在 yarn 集群之上,由 yarn 负责资源管

    理,Spark 负责任务调度和计算,好处:计算资源按需伸缩,集群利用率高,共享底层存

    储,避免数据跨集群迁移。

    Spark Standalone集群模式(开发测试及生成环境使用):类似Hadoop YARN架构,典型

    的Mater/Slaves模式,使用Zookeeper搭建高可用,避免Master是有单点故障的。

    Apache Mesos集群模式(国内使用较少):运行在 mesos 资源管理器框架之上,由 mesos

    负责资源管理,Spark 负责任务调度和计算

  3. 、云服务:Kubernetes 模式

    中小公司未来会更多的使用云服务,Spark 2.3开始支持将Spark 开发应用运行到K8s上。

三、运行spark-shell

本地模式运行Spark框架提供交互式命令行:spark-shell,其中本地模式LocalMode含义为:

启动一个JVM Process进程,执行任务Task,使用方式如下:

--master local | local[*] | local[K] 建议 K >= 2 正整数
  • spark 中Task以线程Thread方式运行

  • 每个Task运行需要1 Core CPU

本地模式启动spark-shell:

进入Spark安装目录

cd ${SPARK_HOME}

启动spark-shell

bin/spark-shell --master local[2]
四、 词频统计WordCount
  1. 使用Spark编程实现,分为三个步骤:

    第一步、从HDFS读取文件数据,sc.textFile方法,将数据封装到RDD中

    第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey

    第三步、将最终处理结果RDD保存到HDFS或打印控制台

  2. Scala中reduce函数使用案例如下:

    一、Spark概述
    image-20201206182413979
  3. 在Spark数据结构RDD中reduceByKey函数,相当于MapReduce中shuffle和reduce函数合在一起:

    按照Key分组,将相同Value放在迭代器中,再使用reduce函数对迭代器中数据聚合。

## 读取HDFS文本数据,封装到RDD集合中,文本中每条数据就是集合中每条数据val inputRDD = sc.textFile("/datas/wordcount.data")## 将集合中每条数据按照分隔符分割,使用正则:https://www.runoob.com/regexp/regexp-syntax.htmlval wordsRDD = inputRDD.flatMap(line => line.split("\\s+"))## 转换为二元组,表示每个单词出现一次val tuplesRDD = wordsRDD.map(word => (word, 1))# 按照Key分组,对Value进行聚合操作, scala中二元组就是Java中Key/Value对## reduceByKey:先分组,再聚合val wordcountsRDD = tuplesRDD.reduceByKey((tmp, item) => tmp + item)## 查看结果wordcountsRDD.take(5)## 保存结果数据到HDFs中wordcountsRDD.saveAsTextFile("/datas/spark-wc")## 查结果数据hdfs dfs -text /datas/spark-wc/par*
五、WordCount Java版
  1. pom.xml

 <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- 该插件用于将Scala代码编译成class文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到maven的compile阶段 -->
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

2.wordcount程序

// 创建Spark运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

    // 创建Spark上下文环境对象(连接对象)
    val sc : SparkContext = new SparkContext(sparkConf)

    // 读取文件数据
    val fileRDD: RDD[String] = sc.textFile("word.txt")

    // 将文件中的数据进行分词
    val wordRDD: RDD[String] = fileRDD.flatMap( _.split(" ") )

    // 转换数据结构 word => (word, 1)
    val word2OneRDD: RDD[(String, Int)] = wordRDD.map((_,1))

    // 将转换结构后的数据按照相同的单词进行分组聚合
    val word2CountRDD: RDD[(String, Int)] = word2OneRDD.reduceByKey(_+_)

    // 将数据聚合结果采集到内存中
    val word2Count: Array[(String, Int)] = word2CountRDD.collect()

    // 打印结果
    word2Count.foreach(println)

    //关闭Spark连接
    sc.stop()
六、Spark的运行环境

1.local 模式

安装环境

解压spark安装包

tar -zxvf spark-2.0.2-scala-2.11.tgz -C /software/

启动Local环境

cd /${SPARK_HOME}/bin
spark-shell --master local[*]

启动成功后,可以输入网址进行Web UI监控页面访问

node01:4040

一、Spark概述
image-20200818101952875

wordcount程序

sc.textFile("/data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

应用提交执行

spark-submit --class org.apache.spark.examples.SparkPi --master local[2] /software/spark-2.0.2/examples/jars/spark-examples_2.11-2.0.2.jar 10
  • --class表示要执行程序的主类

  • --master local[2] 部署模式,默认为本地模式,数字表示分配的虚拟CPU核数量

  • spark-examples_2.12-2.4.5.jar 运行的应用类所在的jar包

  • 数字10表示程序的入口参数,用于设定当前应用的任务数量

2.Standalone模式
local本地模式毕竟只是用来进行练习演示的,真实工作中还是要将应用提交到对应的集群中去执行,这里我们来看看只使用Spark自身节点运行的集群模式,也就是我们所谓的独立部署(Standalone)模式。Spark的Standalone模式体现了经典的master-slave模式。
集群规划:

node01 node02 node03
Worker  Master Worker Worker

环境搭建

进入解压缩后路径的conf目录,修改slaves.template文件名为slaves

       mv slaves.template slaves  

修改slaves文件,添加work节点

node01
node01
node03

修改spark-env.sh.template文件名为spark-env.sh

   mv spark-env.sh.template spark-env.sh

修改spark-env.sh文件,添加JAVA_HOME环境变量和集群对应的master节点

export JAVA_HOME=/opt/module/jdk1.8.0_144
SPARK_MASTER_HOST=node01
SPARK_MASTER_PORT=7077

7077端口,相当于hadoop3内部通信的8020端口
    分发spark-standalone目录到node01,node02,node03

启动集群

sbin/start-all.sh

查看master监控界面

master:8080

一、Spark概述
image-20200818103727357

提交应用

spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node01:7077 \
/software
/spark-2.0.2/examples/jars/spark-examples_2.11-2.0.2.jar \
10

1) --class表示要执行程序的主类

2) --master spark://node01:7077 独立部署模式,连接到Spark集群

3) spark-examples_2.12-2.4.5.jar 运行类所在的jar包

4) 数字10表示程序的入口参数,用于设定当前应用的任务数量

执行任务时,会产生多个Java进程

一、Spark概述
image-20200818104405349

执行任务时,默认采用服务器集群节点的总核数,每个节点内存1024M。

一、Spark概述
image-20200818104622736

提交参数说明

参数 解释 可选值举例
--class Spark程序中包含主函数的类
--master Spark程序运行的模式 本地模式:local[*]、spark://linux1:7077、Yarn
--executor-memory 1G 指定每个executor可用内存为1G 符合集群内存配置即可,具体情况具体分析。
--total-executor-cores 2 指定所有executor使用的cpu核数为2个
--executor-cores 指定每个executor使用的cpu核数
application-jar 打包好的应用jar,包含依赖。这个URL在集群中全局可见。比如hdfs:// 共享存储系统,如果是file:// path,那么所有的节点的path都包含同样的jar
application-arguments 传给main()方法的参数

配置历史服务

1、修改spark-defaults.conf

spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://node01:9000/data/sparkhistory

如果HDFS不存在该目录,建立该目录

2、修改spark-env.conf

#配置java环境变量
export JAVA_HOME=/software/jdk1.8.0_201
#配置master的地址
#export SPARK_MASTER_HOST=node01
#配置master的端口
export SPARK_MASTER_PORT=7077
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER 
-Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181 
-Dspark.deploy.zookeeper.dir=/spark"
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://node01:9000/data/sparkhistory -Dspark.history.retainedApplications=30"

3、重新启动集群和历史服务

start-all.sh
start-history-server.sh

4、提交任务

spark-submit --class org.apache.spark.examples.SparkPi --master spark://node01:7077 /software/spark-2.0.2/examples/jars/spark-examples_2.11-2.0.2.jar 10

5、查看历史服务:http://node01:18080

一、Spark概述
image-20200818135433693

用到的命令:

 hdfs haadmin -failover --forcefence --forceactive nn2 nn1
配置高可用

1.修改/spark-env.sh

#配置java环境变量
export JAVA_HOME=/software/jdk1.8.0_201
#配置master的地址
#export SPARK_MASTER_HOST=node01
#配置master的端口
export SPARK_MASTER_PORT=7077
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER 
-Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181 
-Dspark.deploy.zookeeper.dir=/spark"

export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://node01:9000/data/sparkhistory -Dspark.history.retainedApplications=30"

2、分发配置文件到其他节点,重启集群

3、启动linux2的单独Master节点,此时linux2节点Master状态处于备用状态

start-master.sh 

4、登录node02:8989查看master为8989

image-20200818141152161

5、提交任务到高可用集群

spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node01:7077,node02:7077 \
--deploy-mode cluster \
/software
/spark-2.0.2/examples/jars/spark-examples_2.11-2.0.2.jar \
10

6、高可用方式启动集群

spark-shell --master spark://node01:7077,node02:7077,node03:7077
yarn模式

独立部署(Standalone)模式由Spark自身提供计算资源,无需其他框架提供资源。这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Spark主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些。所以接下来我们来学习在强大的Yarn环境下Spark是如何工作的(其实是因为在国内工作中,Yarn使用的非常多)。

1、修改配置文件

1) 修改conf/spark-env.sh,添加JAVA_HOME和YARN_CONF_DIR配置

#配置java环境变量
export JAVA_HOME=/software/jdk1.8.0_201
#配置master的地址
#export SPARK_MASTER_HOST=node01
#配置master的端口
#export SPARK_MASTER_PORT=7077
SPARK_MASTER_WEBUI_PORT=8989
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER 
-Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181 
-Dspark.deploy.zookeeper.dir=/spark"

export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://node01:9000/data/sparkhistory -Dspark.history.retainedApplications=30"

2、重启集群

stop-all.sh
start-all.sh

3、提交任务测试

spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
/software/spark-2.0.2/examples/jars/spark-examples_2.11-2.0.2.jar \
10

4、查看作业执行情况http://node01:8088

image-20200818150806566
三种模式对比
模式 Spark安装机器数 需启动的进程 所属者 应用场景
Local 1 Spark 测试
Standalone 3 Master及Worker Spark 单独部署
Yarn 1 Yarn及HDFS Hadoop 混合部署
spark的端口号
  1. Spark查看当前Spark-shell运行任务情况端口号:4040(计算)

  2. Spark Master内部通信服务端口号:7077

  3. Standalone模式下,Spark Master Web端口号:8080(资源)

  4. Spark历史服务器端口号:18080

  5. Hadoop YARN任务运行情况查看端口号:8088


以上是关于一Spark概述的主要内容,如果未能解决你的问题,请参考以下文章

Spark闭包与序列化

spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段

Reactreact概述组件事件

一Spark概述

Spark基础学习笔记32:Spark Streaming概述

Spark概述