Spark 编程模型(中)
Posted 百里登风
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 编程模型(中)相关的知识,希望对你有一定的参考价值。
先在IDEA新建一个maven项目
我这里用的是jdk1.8,选择相应的骨架
这里选择本地在window下安装的maven
新的项目创建成功
我的开始pom.xml文件配置
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.gong.spark</groupId> <artifactId>learning-spark</artifactId> <version>1.0-SNAPSHOT</version> <inceptionYear>2008</inceptionYear> <properties> <scala.version>2.10.4</scala.version> <spark.version>1.6.1</spark.version> </properties> <repositories> <repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.4</version> <scope>test</scope> </dependency> <dependency> <groupId>org.specs</groupId> <artifactId>specs</artifactId> <version>1.2.5</version> <scope>test</scope> </dependency> <!--spark--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> </dependencies> <build> <!-- <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> --> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.1</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <!-- add Main-Class to manifest file --> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <!--<mainClass>com.dajiang.MyDriver</mainClass>--> </transformer> </transformers> <createDependencyReducedPom>false</createDependencyReducedPom> </configuration> </execution> </executions> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project>
先在终端下试下打包
可以看到失败了!!!
把这几个生成默认的东西删除掉
再次测试
成功了
把他clean一下
进入自己在虚拟机安装的centos里面的spark
先测试一下spark环境有没有问题
OK运行成,环境没问题!
1.创建RDD
方式一:从集合创建RDD
回到idea,在main路径下新建java目录,并且对其以下操作:
在test路径下新建java目录,对其以下操作:
建包
在当前包下起个名字,其实也就是在这个包的路径下再建下一级目录
因为我们现在要写的是java程序,所以新建一个java类
写个简单的程序测下运行一下
OK没问题,可以运行!
把这里的这个插件由原来的1.5改成1.8,因为刚刚跑的时候有警告
在这里新建一个包,具体怎么操作这里就不重复了
插入之前写好的MyJavaWordCount.java的代码
MyJavaWordCount.java参考代码
package com.gong.spark.chap2_3; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; public class MyJavaWordCount { public static void main(String[] args){ //参数检查 if(args.length<2) { System.err.println("Usage:MyJavaCount <input> <output>"); System.exit(1); } //获取参数 String input=args[0]; String output=args[1]; //创建java版本的SparkContext SparkConf conf=new SparkConf().setAppName("MyJavaWordCount"); JavaSparkContext sc = new JavaSparkContext(conf); //读取数据 JavaRDD<String> inputRdd=sc.textFile(input); //进行相关计算 JavaRDD<String> words=inputRdd.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairRDD<String,Integer> result = words.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String,Integer> call(String word) throws Exception{ return new Tuple2(word,1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer x, Integer y) throws Exception { return x+y; } }); //保存结果 result.saveAsTextFile(output); //关闭sc sc.stop(); } }
把之前写好的scala版本的WordCount程序放进来
参考代码:
package com.gong.spark import org.apache.spark.{SparkConf, SparkContext} object MyWordCount { def main(args: Array[String]):Unit={ //参数检查 if(args.length<2){ System.err.println("Usage:MyWordCount <input> <output>") System.exit(1) } //获取参数 val input=args(0) val output=args(1) //创建SparkContext val conf=new SparkConf().setAppName("myWordCount") val sc=new SparkContext(conf) //读取数据 val lines=sc.textFile(input) //进行相关计算 val resultRdd=lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) //保存结果 resultRdd.saveAsTextFile(output) sc.stop() } }
在终端mvn package
可以看到失败了,看来还是要把这里还回1.5版本的,不能乱改
再次在终端mvn package,可以看到成功了!
方式二;读取外部存储创建RDD
transformation操作
惰性求值
转换操作
基本转换操作1
基本转换操作2
控制操作
action操作
以上是关于Spark 编程模型(中)的主要内容,如果未能解决你的问题,请参考以下文章