Spark - Standalone搭建及HA模式搭建,并使用 ScalaJavaPython 三种语言测试
Posted 小毕超
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark - Standalone搭建及HA模式搭建,并使用 ScalaJavaPython 三种语言测试相关的知识,希望对你有一定的参考价值。
一、Standalone 搭建
Standalone 集群分为 master 和 slave 两种节点,如下图所示:
搭建整体架构:
主机 | 别名 | 用途 |
---|---|---|
192.168.40.172 | node1 | master |
192.168.40.173 | node2 | worker/slave |
192.168.40.174 | node3 | worker/slave |
安装前需要提前安装好 hadoop
环境,配置 node1
到 node2、node3
的免密ssh
,以及集群时间同步,可以参考下面我的博客:
确保 hadoop
已经成功启动起来。
下载 Spark
安装包,这里我用的 3.0.1
版本:
先将下载后的安装包上传至 node1
节点,解压安装包:
tar -zxvf spark-3.0.1-bin-hadoop2.7.tgz
进入到加压目录下的 conf
下,修改配置:
配置 slaves
添加 slave
主机 node2、node3
mv slaves.template slaves
vi slaves
下面配置 spark-env.sh
主要告诉 Spark jdk
和hadoop
的位置,以及主节点的主机,如果数据没有读 HDFS
中的数据可以不配置 hadoop
相关参数
mv spark-env.sh.template spark-env.sh
vi spark-env.sh
## 设置JAVA安装目录
JAVA_HOME=/usr/lib/jvm/java-1.8.0
## HADOOP软件配置文件目录,读取HDFS上文件和运行Spark在YARN集群时需要
HADOOP_CONF_DIR=/export/server/hadoop-3.1.4/etc/hadoop
YARN_CONF_DIR=/export/server/hadoop-3.1.4/etc/hadoop
## 指定spark老大Master的IP和提交任务的通信端口
SPARK_MASTER_HOST=node1
SPARK_MASTER_PORT=7077
SPARK_MASTER_WEBUI_PORT=8080
SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=1g
将修改好的安装包上传至 node2、node3
主机下:
scp -r /export/spark/ root@node2:/export/spark/
scp -r /export/spark/ root@node3:/export/spark/
进去到解压目录下,启动 Spark
集群:
使用一键脚本
#启动 master、slave
sbin/start-all.sh
#关闭 master、slave
sbin/stop-all.sh
单独启动:
# 启动master
sbin/start-master.sh
# 关闭master
sbin/stop-master.sh
#启动 slave
sbin/start-slaves.sh
#关闭 slave
sbin/stop-slaves.sh
主节点查看进程:
从节点查看进程:
查看web
管理页面:
http://192.168.40.172:8080
使用 Spark Shell
:
bin/spark-shell --master spark://node1:7077
运行测试程序:
val textFile = sc.parallelize(Seq("abc", "abc", "ff", "ee", "ff"))
val counts = textFile.map((_,1)).reduceByKey(_ + _)
counts.collect
二、使用三种语言测试环境
1. java 和 Scala 项目
创建一个普通的Maven
项目,在 pom
中添加 Scala
和 Spark
的依赖:
<!--依赖Scala语言-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.11</version>
</dependency>
<!--SparkCore依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.1</version>
</dependency>
在 main
下面创建 scala
包,专门存放 scala
程序,java
下专门存放 java
程序:
Scala 测试程序
object RddTestScala
def main(args: Array[String]): Unit =
val conf = new SparkConf().setAppName("spark")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val texts= sc.parallelize(Seq("abc", "abc", "ff", "ee", "ff"))
val counts = texts.map((_,1)).reduceByKey(_ + _)
println(counts.collectAsMap())
Java 测试程序
public class RddTestJava
public static void main(String[] args)
SparkConf conf = new SparkConf().setAppName("spark");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("WARN");
JavaRDD<String> texts= sc.parallelize(Arrays.asList("abc", "abc", "ff", "ee", "ff"));
JavaPairRDD<String, Integer> counts = texts.mapToPair(s -> new Tuple2<>(s, 1)).reduceByKey(Integer::sum);
System.out.println(counts.collectAsMap());
由于默认情况下使用 maven
编译不会编译 scala
程序,在 build
中添加 scala
的插件:
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<!-- 指定编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<!-- 指定编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>$project.build.directory/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.bxc.RddTestJava</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
打成 jar
包:
mvn clean package
将打包后的jar
包上传至 node1
节点,进到 Spark
解压目录下:
运行 Scala 脚本
bin/spark-submit \\
--master spark://node1:7077 \\
--executor-memory 1G \\
--executor-cores 2 \\
--class com.bxc.RddTestScala \\
/export/spark/spark-submit-demo-1.0-SNAPSHOT.jar
运行 Java 脚本
bin/spark-submit \\
--master spark://node1:7077 \\
--executor-memory 1G \\
--executor-cores 2 \\
--class com.bxc.RddTestJava \\
/export/spark/spark-submit-demo-1.0-SNAPSHOT.jar
2. Python项目
编写 Python
脚本:
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName('spark')
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
texts = sc.parallelize(["abc", "abc", "ff", "ee", "ff"])
counts = texts.map(lambda s:(s, 1)).reduceByKey(lambda v1,v2:v1+v2)
print(counts.collectAsMap())
将脚本上传至 node1
节点,,进到 Spark
解压目录下:
bin/spark-submit \\
--master spark://node1:7077 \\
--executor-memory 1G \\
--executor-cores 2 \\
/export/spark/RddTestPy.py
三、HA模式搭建
上面模式的缺点是 Master
节点只有一台,一旦主节点宕机,集群便不可使用,此时我们可以多启动几台主节点已达到HA
模式,那么多主节点,之间的协调肯定是必不可少的,此时可依赖于zookeeper
,如下图所示:
安装 zookeeper
可参考下面这篇文章:
修改 conf/spark-env.sh
文件:
移除上面添加的 SPARK_MASTER_HOST=node1
,增加下面配置,如果 zookeeper
是集群则逗号隔开:
SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.40.1:2181 -Dspark.deploy.zookeeper.dir=/spark-ha"
将修改的配置分发到 node2、node3
节点:
scp -r /export/spark/spark-3.0.1-bin-hadoop2.7/conf/spark-env.sh root@node2:/export/spark/spark-3.0.1-bin-hadoop2.7/conf/
scp -r /export/spark/spark-3.0.1-bin-hadoop2.7/conf/spark-env.sh root@node3:/export/spark/spark-3.0.1-bin-hadoop2.7/conf/
重启集群:
sbin/stop-all.sh
sbin/start-all.sh
在 node2
节点上启动一个 master
节点:
sbin/start-master.sh
查看 node1
的 webUI
:http://192.168.40.172:8080
查看 node2
的 webUI
:http://192.168.40.173:8080
:
查看 zookeeper
上的临时节点:
在node3
中进入 spark-shell
中:
bin/spark-shell --master spark://node1:7077,node2:7077
执行脚本,可以正常返回结果:
val t = sc.parallelize(Seq("abc", "abc", "ff", "ee", "ff"))
t.collect()
下面将 node1
节点的 master
进程强行杀掉:
下面查看 node2
上的 master
的webUI
,可以看到 slave
已经转移过来:
shell
中查看原先创建的变量依然可以正常使用:
t.collect()
以上是关于Spark - Standalone搭建及HA模式搭建,并使用 ScalaJavaPython 三种语言测试的主要内容,如果未能解决你的问题,请参考以下文章