多种语言开发Spark-以WordCount为例
Posted 低调才是王道
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多种语言开发Spark-以WordCount为例相关的知识,希望对你有一定的参考价值。
Spark是目前最火爆的大数据计算框架,有赶超Hadoop MapReduce的趋势。因此,趁着现在还有大多数人不懂得Spark开发的,赶紧好好学习吧,为了使不同的开发人员能够很好的利用Spark,Spark官方提供了不同开发语言的API,本文以大数据经典入门案例WordCount为例,开发多个版本的Spark应用程序,以满足不同的开发人员需求。
一、Scala:
val conf: SparkConf = new SparkConf().setMaster("local") val sc: SparkContext = new SparkContext(conf) sc.textFile("test") .flatMap(line => { line.split("\t") }) .mapPartitions(iter => { val list: List[(String, Int)] = List[(String, Int)]() iter.foreach(word => { list.::((word,1)) }) list.iterator }) .reduceByKey(_ + _) .saveAsTextFile("result")
二、JDK1.7及以下版本:
SparkConf conf = new SparkConf().setAppName("JavaSparkTest").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); sc.textFile("test") .flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String t) throws Exception { return Arrays.asList(t.split("\t")); } }).mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String t) throws Exception { return new Tuple2<String, Integer>(t, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }).saveAsTextFile("result");
三、JDK1.8:
由于JDK1.8加入了新特性——函数式编程,因此,可以利用JDK1.8的新特性简化Java开发Spark的语句。
SparkConf conf = new SparkConf().setAppName("JavaSparkTest").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); sc.textFile("test") .flatMap(line -> { return Arrays.asList(line.split("\t")); }).mapToPair(word -> { return new Tuple2<String, Integer>(word, 1); }).reduceByKey((x, y) -> { return x + y; }).saveAsTextFile("result");
是不是觉得比上述的Scala还简洁呢?其实是这样的,Scala中使用了mapPartitions是对map函数的优化,即对每一个RDD的分区进行map操作,这样就减少了对象的创建,从而加速了计算。而Java中,通过我的测试,不能使用mapPartitions方法进行上述优化,只能使用map方法(不知道为啥),这样也可以使用,但是在大数据集面前,其性能就逊色于mapPartitions了。
四、Python:
from pyspark import SparkContext from pyspark import SparkConf as conf conf.setAppName("WordCount").setMaster("local") sc = SparkContext(conf) text_file = sc.textFile("test") .flatMap(lambda line: line.split("\t")) .map(lambda word: (word, 1)) .reduceByKey(lambda x, y: x + y) .saveAsTextFile("test")
以上是关于多种语言开发Spark-以WordCount为例的主要内容,如果未能解决你的问题,请参考以下文章
关于Spark、Scala实现WordCount的8种写法(多种写法)
spark怎么以master yarn-cluster模式运行wordcount