提交任务到spark(以wordcount为例)
Posted running_wolf
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了提交任务到spark(以wordcount为例)相关的知识,希望对你有一定的参考价值。
1、首先需要搭建好hadoop+spark环境,并保证服务正常。本文以wordcount为例。
2、创建源文件,即输入源。hello.txt文件,内容如下:
tom jerry
henry jim
suse lusy
注:以空格为分隔符
3、然后执行如下命令:
hadoop fs -mkdir -p /Hadoop/Input(在HDFS创建目录)
hadoop fs -put hello.txt /Hadoop/Input(将hello.txt文件上传到HDFS)
hadoop fs -ls /Hadoop/Input (查看上传的文件)
hadoop fs -text /Hadoop/Input/hello.txt (查看文件内容)
4、用spark-shell先测试一下wordcount任务。
(1)启动spark-shell,当然需要在spark的bin目录下执行,但是这里我配置了环境变量。
(2)然后直接输入scala语句:
val file=sc.textFile("hdfs://hacluster/Hadoop/Input/hello.txt")
val rdd = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
rdd.collect()
rdd.foreach(println)
ok,测试通过。
5、Scala实现单词计数
1 package com.example.spark 2 3 /** 4 * User: hadoop 5 * Date: 2017/8/17 0010 6 * Time: 10:20 7 */ 8 import org.apache.spark.SparkConf 9 import org.apache.spark.SparkContext 10 import org.apache.spark.SparkContext._ 11 12 /** 13 * 统计字符出现次数 14 */ 15 object ScalaWordCount { 16 def main(args: Array[String]) { 17 if (args.length < 1) { 18 System.err.println("Usage: <file>") 19 System.exit(1) 20 } 21 22 val conf = new SparkConf() 23 val sc = new SparkContext(conf) 24 val line = sc.textFile(args(0)) 25 26 line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect().foreach(println) 27 28 sc.stop() 29 } 30 }
6、用java实现wordcount
package com.example.spark; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public final class WordCount { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception { if (args.length < 1) { System.err.println("Usage: JavaWordCount <file>"); System.exit(1); } SparkConf conf = new SparkConf().setAppName("JavaWordCount"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile(args[0],1); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String s) { return Arrays.asList(SPACE.split(s)); } }); JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); List<Tuple2<String, Integer>> output = counts.collect(); for (Tuple2<?, ?> tuple : output) { System.out.println(tuple._1() + ": " + tuple._2()); } sc.stop(); } }
7、IDEA打包。
(1)File ---> Project Structure
点击ok,配置完成后,在菜单栏中选择Build->Build Artifacts...,然后使用Build等命令打包。打包完成后会在状态栏中显示“Compilation completed successfully...”的信息,去jar包输出路径下查看jar包,如下所示。
将这个wordcount.jar上传到集群的节点上,scp wordcount.jar root@10.57.22.244:/opt/ 输入虚拟机root密码。
8、运行jar包。
本文以spark on yarn模式运行jar包。
执行命令运行javawordcount:spark-submit --master yarn-client --class com.example.spark.WordCount --executor-memory 1G --total-executor-cores 2 /opt/wordcount.jar hdfs://hacluster/aa/hello.txt
执行命令运行scalawordcount:spark-submit --master yarn-client --class com.example.spark.ScalaWordCount --executor-memory 1G --total-executor-cores 2 /opt/wordcount.jar hdfs://hacluster/aa/hello.txt
本文以java的wordcount为演示对象,如下图:
以上是直接以spark-submit方式提交任务,下面介绍一种以java web的方式提交。
9、以Java Web的方式提交任务到spark。
用spring boot搭建java web框架,实现代码如下:
1)新建maven项目spark-submit
2)pom.xml文件内容,这里要注意spark的依赖jar包要与scala的版本相对应,如spark-core_2.11,这后面2.11就是你安装的scala的版本
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.1.RELEASE</version> </parent> <groupId>wordcount</groupId> <artifactId>spark-submit</artifactId> <version>1.0-SNAPSHOT</version> <properties> <start-class>com.example.spark.SparkSubmitApplication</start-class> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> <commons.version>3.4</commons.version> <org.apache.spark-version>2.1.0</org.apache.spark-version> </properties> <dependencies> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>${commons.version}</version> </dependency> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-jasper</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.jayway.jsonpath</groupId> <artifactId>json-path</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <artifactId>spring-boot-starter-tomcat</artifactId> <groupId>org.springframework.boot</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jetty</artifactId> <exclusions> <exclusion> <groupId>org.eclipse.jetty.websocket</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jetty</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>jstl</artifactId> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>apache-jsp</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-solr</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>jstl</artifactId> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${org.apache.spark-version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${org.apache.spark-version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${org.apache.spark-version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${org.apache.spark-version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.11</artifactId> <version>1.6.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-graphx_2.11</artifactId> <version>${org.apache.spark-version}</version> </dependency> <dependency> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.6.5</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.6.5</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.6.5</version> </dependency> </dependencies> <packaging>war</packaging> <repositories> <repository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> <repository> <id>maven2</id> <url>http://repo1.maven.org/maven2/</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </pluginRepository> <pluginRepository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </pluginRepository> </pluginRepositories> <build> <plugins> <plugin> <artifactId>maven-war-plugin</artifactId> <configuration> <warSourceDirectory>src/main/webapp</warSourceDirectory> </configuration> </plugin> <plugin> <groupId>org.mortbay.jetty</groupId> <artifactId>jetty-maven-plugin</artifactId> <configuration> <systemProperties> <systemProperty> <name>spring.profiles.active</name> <value>development</value> </systemProperty> <systemProperty> <name>org.eclipse.jetty.server.Request.maxFormContentSize</name> <!-- -1代表不作限制 --> <value>600000</value> </systemProperty> </systemProperties> <useTestClasspath>true</useTestClasspath> <webAppConfig> <contextPath>/</contextPath> </webAppConfig> <connectors> <connector implementation="org.eclipse.jetty.server.nio.SelectChannelConnector"> <port>7080</port> </connector> </connectors> </configuration> </plugin> </plugins> </build> </project>
(3)SubmitJobToSpark.java
package com.example.spark; import org.apache.spark.deploy.SparkSubmit; /** * @author kevin * */ public class SubmitJobToSpark { public static void submitJob() { String[] args = new String[] { "--master", "yarn-client", "--name", "test java submit job to spark", "--class", "com.example.spark.WordCount", "/opt/wordcount.jar", "hdfs://hacluster/aa/hello.txt" }; SparkSubmit.main(args); } }
(4)SparkController.java
package com.example.spark.web.controller; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; import com.example.spark.SubmitJobToSpark; @Controller @RequestMapping("spark") public class SparkController { private Logger logger = LoggerFactory.getLogger(SparkController.class); @RequestMapping(value = "sparkSubmit", method = { RequestMethod.GET, RequestMethod.POST }) @ResponseBody public String sparkSubmit(HttpServletRequest request, HttpServletResponse response) { logger.info("start submit spark tast..."); SubmitJobToSpark.submitJob(); return "hello"; } }
5)将项目spark-submit打成war包部署到Master节点tomcat上,访问如下请求:
http://10.57.22.244:9090/spark/sparkSubmit
在tomcat的log中能看到计算的结果。
以上是关于提交任务到spark(以wordcount为例)的主要内容,如果未能解决你的问题,请参考以下文章