多种语言开发Spark-以WordCount为例

Posted 低调才是王道

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多种语言开发Spark-以WordCount为例相关的知识,希望对你有一定的参考价值。

Spark是目前最火爆的大数据计算框架,有赶超Hadoop MapReduce的趋势。因此,趁着现在还有大多数人不懂得Spark开发的,赶紧好好学习吧,为了使不同的开发人员能够很好的利用Spark,Spark官方提供了不同开发语言的API,本文以大数据经典入门案例WordCount为例,开发多个版本的Spark应用程序,以满足不同的开发人员需求。

一、Scala:

  

    val conf: SparkConf = new SparkConf().setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    sc.textFile("test")
      .flatMap(line => {
        line.split("\t")
      })
      .mapPartitions(iter => {
        val list: List[(String, Int)] = List[(String, Int)]()
        iter.foreach(word => {
          list.::((word,1))
        })
        list.iterator
      })
      .reduceByKey(_ + _)
      .saveAsTextFile("result")

二、JDK1.7及以下版本:

  

SparkConf conf = new SparkConf().setAppName("JavaSparkTest").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
sc.textFile("test")
        .flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String t) throws Exception {
                return Arrays.asList(t.split("\t"));
            }
        }).mapToPair(new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(String t) throws Exception {
                return new Tuple2<String, Integer>(t, 1);
            }
            
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        }).saveAsTextFile("result");

三、JDK1.8:

  由于JDK1.8加入了新特性——函数式编程,因此,可以利用JDK1.8的新特性简化Java开发Spark的语句。

SparkConf conf = new SparkConf().setAppName("JavaSparkTest").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.textFile("test")
        .flatMap(line -> {
            return Arrays.asList(line.split("\t"));
        }).mapToPair(word -> {
            return new Tuple2<String, Integer>(word, 1);
        }).reduceByKey((x, y) -> {
            return x + y;
        }).saveAsTextFile("result");

  是不是觉得比上述的Scala还简洁呢?其实是这样的,Scala中使用了mapPartitions是对map函数的优化,即对每一个RDD的分区进行map操作,这样就减少了对象的创建,从而加速了计算。而Java中,通过我的测试,不能使用mapPartitions方法进行上述优化,只能使用map方法(不知道为啥),这样也可以使用,但是在大数据集面前,其性能就逊色于mapPartitions了。

 四、Python:

from pyspark import SparkContext
from pyspark import SparkConf as conf
conf.setAppName("WordCount").setMaster("local")
sc = SparkContext(conf)

text_file = sc.textFile("test")    .flatMap(lambda line: line.split("\t"))    .map(lambda word: (word, 1))    .reduceByKey(lambda x, y: x + y)    .saveAsTextFile("test")

 

以上是关于多种语言开发Spark-以WordCount为例的主要内容,如果未能解决你的问题,请参考以下文章

Spark 之WordCount

关于Spark、Scala实现WordCount的8种写法(多种写法)

spark怎么以master yarn-cluster模式运行wordcount

小记--------spark-Wordcount经典案例之对结果根据词频进行倒序排序

MapReduce入门详解(以WordCount为例)

使用java开发spark的wordcount程序