spark集群配置以及java操作spark小demo
Posted ye-hcj
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark集群配置以及java操作spark小demo相关的知识,希望对你有一定的参考价值。
spark
安装
tar -zxvf spark-2.4.0-bin-hadoop2.7.tgz
rm spark-2.4.0-bin-hadoop2.7.tgz
mv spark-2.4.0-bin-hadoop2.7 spark
sudo vim /etc/profile
export SPARK_HOME=/usr/local/storm
export PATH=$PATH:$SPARK_HOME/bin
source /etc/profile
准备 master worker1 worker2 worker3 这四台机器
首先确保你的Hadoop集群能够正常运行worker1 worker2 worker3为DataNode, master为NameNode
具体配置参照我的博客https://www.cnblogs.com/ye-hcj/p/10192857.html
配置
spark-env.sh
进入spark的conf目录下,cp spark-env.sh.template spark-env.sh sudo vim spark-env.sh 输入如下配置 export JAVA_HOME=/usr/local/jdk/jdk-11.0.1 export SCALA_HOME=/usr/local/scala/scala export HADOOP_HOME=/usr/local/hadoop/hadoop-3.1.1 export SPARK_HOME=/usr/local/spark/spark export HADOOP_CONF_DIR=/usr/local/hadoop/hadoop-3.1.1/etc/hadoop export SPARK_MASTER_HOST=master export SPARK_WORKER_MEMORY=1g export SPARK_WORKER_CORES=1
slaves
进入spark的conf目录下,cp slaves.template slaves sudo vim slaves 输入如下配置 master worker1 worker2 worker3
启动
在master中运行 sbin/start-all.sh 即可 访问http://master:8080/即可看到spark的ui
使用java来操作spark
写个小demo,用来分析10万个数据中男女人数
模拟数据的java代码
// 模拟数据 // 10万个人当中,统计青年男性和青年女性的比例,看看男女比例是否均衡 FileOutputStream f = null; ThreadLocalRandom random = ThreadLocalRandom.current(); String str = ""; int count = 0; try { f = new FileOutputStream("C:\\Users\\26401\\Desktop\\data.txt", true); for(;count<100000;count++) { str = count + " " + random.nextInt(18, 28) + " " + (random.nextBoolean()?‘M‘:‘F‘); f.write((str + " ").getBytes()); } } catch (Exception e) { e.printStackTrace(); } finally { try { if(f != null) f.close(); } catch (IOException e) { e.printStackTrace(); } }
依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>test</groupId> <artifactId>test</artifactId> <version>1.0.0</version> <name>test</name> <description>Test project for spring boot mybatis</description> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.encoding>UTF-8</maven.compiler.encoding> <java.version>1.8</java.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <useUniqueVersions>false</useUniqueVersions> <classpathPrefix>lib/</classpathPrefix> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
java代码
package test; import java.io.Serializable; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class App implements Serializable { private static final long serialVersionUID = -7114915627898482737L; public static void main(String[] args) throws Exception { Logger logger=LoggerFactory.getLogger(App.class); SparkConf sparkConf = new SparkConf(); sparkConf.setMaster("spark://master:7077"); sparkConf.set("spark.submit.deployMode", "cluster"); sparkConf.setAppName("FirstTest"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD<String> file = sc.textFile("hdfs://master:9000/data.txt"); JavaRDD<String> male = file.filter(new Function<String, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(String s) throws Exception { logger.info(s); return s.contains("M"); } }); logger.info("**************************************"); logger.info(male.count()+""); // 49991 logger.info("**************************************"); sc.close(); // 其他的api请自行查阅,很简单,不想看,可以自己瞎点 } }
运行
1. 将生成的测试数据data.txt上传至hdfs 2. 将打包的jar上传到master机器 3. 运行 bin/spark-submit --master spark://master:7077 --class test.App test-1.0.0.jar 4. 进入spark的ui界面可以清楚的看到打印的消息
以上是关于spark集群配置以及java操作spark小demo的主要内容,如果未能解决你的问题,请参考以下文章