Spark - Standalone搭建及HA模式搭建,并使用 ScalaJavaPython 三种语言测试

Posted 小毕超

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark - Standalone搭建及HA模式搭建,并使用 ScalaJavaPython 三种语言测试相关的知识,希望对你有一定的参考价值。

一、Standalone 搭建

Standalone 集群分为 master 和 slave 两种节点,如下图所示:

搭建整体架构:

主机别名用途
192.168.40.172node1master
192.168.40.173node2worker/slave
192.168.40.174node3worker/slave

安装前需要提前安装好 hadoop 环境,配置 node1node2、node3 的免密ssh,以及集群时间同步,可以参考下面我的博客:

https://blog.csdn.net/qq_43692950/article/details/127158935

确保 hadoop 已经成功启动起来。

下载 Spark 安装包,这里我用的 3.0.1 版本:

http://spark.apache.org/downloads.html

先将下载后的安装包上传至 node1 节点,解压安装包:

tar -zxvf spark-3.0.1-bin-hadoop2.7.tgz

进入到加压目录下的 conf 下,修改配置:

配置 slaves 添加 slave 主机 node2、node3

mv slaves.template slaves
vi slaves


下面配置 spark-env.sh 主要告诉 Spark jdkhadoop 的位置,以及主节点的主机,如果数据没有读 HDFS 中的数据可以不配置 hadoop 相关参数

mv spark-env.sh.template spark-env.sh
vi spark-env.sh
## 设置JAVA安装目录
JAVA_HOME=/usr/lib/jvm/java-1.8.0

## HADOOP软件配置文件目录,读取HDFS上文件和运行Spark在YARN集群时需要
HADOOP_CONF_DIR=/export/server/hadoop-3.1.4/etc/hadoop
YARN_CONF_DIR=/export/server/hadoop-3.1.4/etc/hadoop

## 指定spark老大Master的IP和提交任务的通信端口
SPARK_MASTER_HOST=node1
SPARK_MASTER_PORT=7077

SPARK_MASTER_WEBUI_PORT=8080

SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=1g


将修改好的安装包上传至 node2、node3 主机下:

scp -r /export/spark/ root@node2:/export/spark/
scp -r /export/spark/ root@node3:/export/spark/

进去到解压目录下,启动 Spark 集群:

使用一键脚本

#启动 master、slave
sbin/start-all.sh
#关闭 master、slave
sbin/stop-all.sh

单独启动:

# 启动master
sbin/start-master.sh
# 关闭master
sbin/stop-master.sh
#启动 slave
sbin/start-slaves.sh
#关闭 slave
sbin/stop-slaves.sh

主节点查看进程:

从节点查看进程:

查看web管理页面:

http://192.168.40.172:8080


使用 Spark Shell

bin/spark-shell --master spark://node1:7077


运行测试程序:

val textFile = sc.parallelize(Seq("abc", "abc", "ff", "ee", "ff"))
val counts = textFile.map((_,1)).reduceByKey(_ + _)
counts.collect


二、使用三种语言测试环境

1. java 和 Scala 项目

创建一个普通的Maven项目,在 pom 中添加 ScalaSpark 的依赖:

<!--依赖Scala语言-->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.12.11</version>
</dependency>

<!--SparkCore依赖-->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.0.1</version>
</dependency>

main 下面创建 scala 包,专门存放 scala 程序,java 下专门存放 java 程序:

Scala 测试程序

object RddTestScala 
  def main(args: Array[String]): Unit = 
    val conf = new SparkConf().setAppName("spark")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val texts= sc.parallelize(Seq("abc", "abc", "ff", "ee", "ff"))
    val counts = texts.map((_,1)).reduceByKey(_ + _)
    println(counts.collectAsMap())
  

Java 测试程序

public class RddTestJava 
    public static void main(String[] args) 
        SparkConf conf = new SparkConf().setAppName("spark");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");

        JavaRDD<String> texts= sc.parallelize(Arrays.asList("abc", "abc", "ff", "ee", "ff"));
        JavaPairRDD<String, Integer> counts = texts.mapToPair(s -> new Tuple2<>(s, 1)).reduceByKey(Integer::sum);
        System.out.println(counts.collectAsMap());
    


由于默认情况下使用 maven 编译不会编译 scala 程序,在 build 中添加 scala 的插件:

<build>
    <sourceDirectory>src/main/java</sourceDirectory>
    <plugins>
        <!-- 指定编译java的插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.5.1</version>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
        <!-- 指定编译scala的插件 -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                    <configuration>
                        <args>
                            <arg>-dependencyfile</arg>
                            <arg>$project.build.directory/.scala_dependencies</arg>
                        </args>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.18.1</version>
            <configuration>
                <useFile>false</useFile>
                <disableXmlReport>true</disableXmlReport>
                <includes>
                    <include>**/*Test.*</include>
                    <include>**/*Suite.*</include>
                </includes>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.bxc.RddTestJava</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

打成 jar 包:

mvn clean package

将打包后的jar包上传至 node1 节点,进到 Spark 解压目录下:

运行 Scala 脚本

bin/spark-submit \\
--master spark://node1:7077 \\
--executor-memory 1G \\
--executor-cores 2 \\
--class com.bxc.RddTestScala \\
/export/spark/spark-submit-demo-1.0-SNAPSHOT.jar

运行 Java 脚本

bin/spark-submit \\
--master spark://node1:7077 \\
--executor-memory 1G \\
--executor-cores 2 \\
--class com.bxc.RddTestJava \\
/export/spark/spark-submit-demo-1.0-SNAPSHOT.jar

2. Python项目

编写 Python 脚本:

from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName('spark')
    sc = SparkContext(conf=conf)
    sc.setLogLevel("WARN")

    texts = sc.parallelize(["abc", "abc", "ff", "ee", "ff"])
    counts = texts.map(lambda s:(s, 1)).reduceByKey(lambda v1,v2:v1+v2)
    print(counts.collectAsMap())

将脚本上传至 node1 节点,,进到 Spark 解压目录下:

bin/spark-submit \\
--master spark://node1:7077 \\
--executor-memory 1G \\
--executor-cores 2 \\
/export/spark/RddTestPy.py

三、HA模式搭建

上面模式的缺点是 Master 节点只有一台,一旦主节点宕机,集群便不可使用,此时我们可以多启动几台主节点已达到HA模式,那么多主节点,之间的协调肯定是必不可少的,此时可依赖于zookeeper,如下图所示:

安装 zookeeper 可参考下面这篇文章:

https://blog.csdn.net/qq_43692950/article/details/110648852

修改 conf/spark-env.sh 文件:

移除上面添加的 SPARK_MASTER_HOST=node1 ,增加下面配置,如果 zookeeper是集群则逗号隔开:

SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.40.1:2181 -Dspark.deploy.zookeeper.dir=/spark-ha"


将修改的配置分发到 node2、node3 节点:

scp -r /export/spark/spark-3.0.1-bin-hadoop2.7/conf/spark-env.sh root@node2:/export/spark/spark-3.0.1-bin-hadoop2.7/conf/
scp -r /export/spark/spark-3.0.1-bin-hadoop2.7/conf/spark-env.sh root@node3:/export/spark/spark-3.0.1-bin-hadoop2.7/conf/

重启集群:

sbin/stop-all.sh
sbin/start-all.sh

node2 节点上启动一个 master 节点:

sbin/start-master.sh


查看 node1webUIhttp://192.168.40.172:8080

查看 node2webUIhttp://192.168.40.173:8080:

查看 zookeeper 上的临时节点:


node3中进入 spark-shell 中:

bin/spark-shell --master spark://node1:7077,node2:7077

执行脚本,可以正常返回结果:

val t = sc.parallelize(Seq("abc", "abc", "ff", "ee", "ff"))
t.collect()

下面将 node1 节点的 master 进程强行杀掉:


下面查看 node2 上的 masterwebUI,可以看到 slave 已经转移过来:


shell 中查看原先创建的变量依然可以正常使用:

t.collect()

以上是关于Spark - Standalone搭建及HA模式搭建,并使用 ScalaJavaPython 三种语言测试的主要内容,如果未能解决你的问题,请参考以下文章

HA-Spark集群环境搭建(Standalone模式)

Spark-Standalone-HA模式

:Spark环境搭建-StandAlone-HA

Spark standalone下的运行过程

大数据问题排查系列 - SPARK STANDALONE HA 模式的一个缺陷点与应对方案

关于Spark下的standalone模式的搭建