底层战详解使用Java开发Spark程序(DT大数据梦工厂)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了底层战详解使用Java开发Spark程序(DT大数据梦工厂)相关的知识,希望对你有一定的参考价值。
Scala开发Spark很多,为什么还要用Java开发原因:1、一般Spark作为数据处理引擎,一般会跟IT其它系统配合,现在业界里面处于霸主地位的是Java,有利于团队的组建,易于移交;2、Scala学习角度讲,比Java难。找Scala的高手比Java难,项目的维护和二次开发比较困难;3、很多人员有Java的基础,确保对Scala不是很熟悉的人可以编写课程中的案例预测:2016年Spark取代Map Reduce,拯救HadoopHadoop+Spark = A winning combationHadoop以后作为基础设施,Spark作为灵魂1、下载最新版的Eclipse2、解压并启动Eclipse3、创建Maven工程4、建立quick-start5、通过buildpath把默认的java5变成java86、配置pom.xml程序,添加程序开发时候的相关依赖,并配置具体打包的buildhttp://maven.outofmemory.cn/org.apache.sparkpackage com.dt.spark.SparkApps.cores;
import java.util.Arrays;
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/** * 使用JAVA的方式进行本地测试开发Spark的Word Count * * @author DT大数据梦工厂 * */
public class WordCount {
public static void main(String[] args) { // TODO Auto-generated method stub /** * 1、创建Spark配置对象SparkConf,设置Spark程序的运行时的程序配置信息 * 例如:通过setMaster来设置程序要连接的Spark集群的 url, * 如果设置为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如只有1G的内存)的初学者 */ SparkConf conf = new SparkConf().setAppName("Spark Word Count In JAVA").setMaster("local");
/** * 2、创建SparkContext对象 * SparkContext是Spark程序所有功能的唯一入口,无论采用 Scala、Java、Python、R等都必须要( * 不同的语言具体的类名称不同,如果是JAVA,则为JavaSparkContext) * SparkContext核心作用:初始化Spark应用程序所运行所需要的核心组件,包括DAGScheduler、TaskScheduler * 、SchedulerBackend 同时还会负责Spark程序往Master注册程序等 * SparkContext是整个Spark应用程序中最为至关重要的一个对象 */ JavaSparkContext sc = new JavaSparkContext(conf );// 其地产曾实际上就是 Scala的SparkContext
/** * 3、根据具体的数据来源(HDFS、HBase、Local FS、DB、S3等)通过SparkContext来创建RDD * RDD创建基本有三种方式:根据外部的数据来源(例如HDFS),根据 Scala集合、由其它的RDD操作 * 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴 */ JavaRDD<String> lines = sc.textFile( "F:/安装文件/操作系统/spark-1.6.0-bin-hadoop2.6/README.md" );
/** * 4、对初始的RDD进行transformation级别的处理,例如map、filter等高阶函数的编程,来进行具体的数据计算 * 4.1、将每一行的字符串拆分成单个的单词 FlatMapFunction<String, * String>是匿名内部类第二个参数泛型,因为知道是String,所以用String */ JavaRDD<String> words = lines.flatMap( new FlatMapFunction<String, String>() {// 如果是 Scala,由于SAM转换,所以可以写成1行代码 val // words // = // lines.flatMap // { // line // => // line.split(" // ") // }
@Override public Iterable<String> call(String line ) throws Exception { // TODO Auto-generated method stub return Arrays.asList( line.split( " ")); } });
/** * 4.2、在单词拆分的基础上对每个单词实例计数为1,也就是word=>(word,1) */ JavaPairRDD<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() {
@Override public Tuple2<String, Integer> call(String word) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(word, 1); } });
/** * 4.3、在每个单词实例计数为1的基础上统计每个单词在文件中出现的总次数 */
JavaPairRDD<String, Integer> wordCounts = pairs .reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override public Integer call(Integer v1 , Integer v2) throws Exception { // TODO Auto-generated method stub return v1 + v2 ; } }); wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> wordNumberPair) throws Exception { // TODO Auto-generated method stub System. out.println(wordNumberPair ._1 +":" +wordNumberPair ._2 ); } }); /** * 5、关闭Spark上下文释放相关资源 */ sc.close();
}
}
关于在Windows下找不winutils.exe
https://github.com/amihalik/hadoop-common-2.6.0-bin/blob/master/bin/winutils.exe
作业:用JAVA方式采用maven方式开发WordCount并运行在集群上
王家林老师名片:
中国Spark第一人
新浪微博:http://weibo.com/ilovepains
微信公众号:DT_Spark
博客:http://blog.sina.com.cn/ilovepains
手机:18610086859
QQ:1740415547
本文出自 “一枝花傲寒” 博客,谢绝转载!
以上是关于底层战详解使用Java开发Spark程序(DT大数据梦工厂)的主要内容,如果未能解决你的问题,请参考以下文章
Spark 1.6 RPC内幕解密:运行机制源码详解Netty与Akka等(DT大数据梦工厂)
编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java
编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java