spark-scala-java实现wordcount

Posted chenxiaoge

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark-scala-java实现wordcount相关的知识,希望对你有一定的参考价值。

引入:spark-scala-java实现wordcount

1.spark-scala实现wordcount

package com.cw.scala.spark

import org.apache.spark.{SparkConf, SparkContext}

/**
  * spark wordcount
  * hello java
  * hello java
  * hello spark
  * *
  * flatMap:
  * hello
  * java
  * hello
  * java
  * hello
  * spark
  * *
  * map:
  * (hello,1)
  * (java,1)
  * (hello,1)
  * (java,1)
  * (hello,1)
  * (spark,1)
  *
  * reduceByKey:将相同的key先分组,再针对每一个组去计算,对每一个组内的value计算
  * 先分组
  * (hello,1)
  * (hello,1)
  * (hello,1)
  *
  * (java,1)
  * (java,1)
  *
  * (spark,1)
  */
object SparkWC {
  def main(args: Array[String]): Unit = {
    //conf可以设置SparkApplication的名称,设置Spark运行的模式
    val conf = new SparkConf().setAppName("wordcount").setMaster("local")
    //SparkContext是通往spark集群的唯一通道
    val sc = new SparkContext(conf)
    //sc.textFiles(path) 能将path里的所有文件内容读出,以文件中的每一行作为一条记录的方式
    sc.textFile("./data/words").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).foreach(println)//这行代码要记住
    sc.stop()




    //    //conf可以设置SparkApplication的名称,设置Spark运行的模式
    //    val conf = new SparkConf()
    //    conf.setAppName("wordcount")
    //    conf.setMaster("local")
    //    //SparkContext是通往spark集群的唯一通道
    //    val sc = new SparkContext(conf)
    //
    //    val lines: RDD[String] = sc.textFile("./data/words")
    //    //flatMap
    //    val words: RDD[String] = lines.flatMap(line => {
    //      line.split(" ")
    //    })
    //    //KV:二元组
    //    val pairWords: RDD[(String, Int)] = words.map(word => {
    //      new Tuple2(word, 1)
    //    })
    //    //将相同的key先分组,再针对每一个组去计算,对每一个组内的value计算
    //    val result: RDD[(String, Int)] = pairWords.reduceByKey((v1: Int, v2: Int) => {
    //      v1 + v2
    //    })
    //    result.foreach(one => {
    //      println(one)
    //    })
    //    sc.stop()

  }
}

详细版本

package com.cw.scala.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object SparkWC {
  def main(args: Array[String]): Unit = {

    //conf可以设置SparkApplication的名称,设置Spark运行的模式
    val conf = new SparkConf()
    conf.setAppName("wordcount")
    conf.setMaster("local")
    //SparkContext是通往spark集群的唯一通道
    val sc = new SparkContext(conf)
    //sc.textFiles(path) 能将path里的所有文件内容读出,以文件中的每一行作为一条记录的方式
    val lines: RDD[String] = sc.textFile("./data/words")
    lines.foreach(println)

    //count:返回数据集中的元素数。会在结果计算完成后回收到Driver端。
    val l: Long = lines.count()
    println(l)
    //take(num):返回一个包含数据集前n个元素的集合。
    val strings: Array[String] = lines.take(3)
    strings.foreach(println)
    //first:first=take(1),返回数据集中的第一个元素。
    val str: String = lines.first()
    println(str)

    //flatMap:先map后flat。与map类似,每个输入项可以映射为0到多个输出项。
    val words: RDD[String] = lines.flatMap(line => {
      line.split(" ")
    })
    
    words.foreach(println)

    //map:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。
    val pairWords: RDD[(String, Int)] = words.map(word => {
      new Tuple2(word, 1)
    })

    pairWords.foreach(println)

    //reduceByKey:将相同的key先分组,再针对每一个组去计算,对每一个组内的value计算
    val result: RDD[(String, Int)] = pairWords.reduceByKey((v1: Int, v2: Int) => {
      v1 + v2
    })

    //foreach:循环遍历数据集中的每个元素,运行相应的逻辑。
    result.foreach(println)
    sc.stop()

  }
}
=======================运行结果========================
//textFile:能将path里的所有文件内容读出,以文件中的每一行作为一条记录的方式
hello java
hello spark
hello hadoop
hello mr
hello java
hello spark
hello scala
hello mr
//count:返回数据集中的元素数。会在结果计算完成后回收到Driver端。
8
//take(3):返回一个包含数据集前n个元素的集合。
hello java
hello spark
hello hadoop
//first:返回数据集中的第一个元素。
hello java
//flatMap:先map后flat。与map类似,每个输入项可以映射为0到多个输出项。
hello
java
hello
spark
hello
hadoop
hello
mr
hello
java
hello
spark
hello
scala
hello
mr
//map:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。
(hello,1)
(java,1)
(hello,1)
(spark,1)
(hello,1)
(hadoop,1)
(hello,1)
(mr,1)
(hello,1)
(java,1)
(hello,1)
(spark,1)
(hello,1)
(scala,1)
(hello,1)
(mr,1)
//reduceByKey:将相同的key先分组,再针对每一个组去计算,对每一个组内的value计算
(scala,1)
(spark,2)
(hadoop,1)
(mr,2)
(hello,8)
(java,2)

2.spark-java实现wordcount

package com.cw.java.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class SparkWordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("wc");
        JavaSparkContext sc = new JavaSparkContext(conf);
        //sc.textFiles(path) 能将path 里的所有文件内容读出,以文件中的每一行作为一条记录的方式,
        JavaRDD<String> lines = sc.textFile("./data/words");
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" ")).iterator();
            }
        });
        JavaPairRDD<String, Integer> pairWords = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });
        /**
         * new Function2<Integer, Integer, Integer>() 如在(hello,1) (hello,1) (hello,1) 第一个hello为1赋给v1,第二个hello为1赋给v2,返回结果v1+v2=2
         * 下一条将2自动赋给v1,第三个hello的1赋给v2 返回v1+v2=3
         */

        JavaPairRDD<String, Integer> result = pairWords.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> tp) throws Exception {
                System.out.println(tp);
            }
        });
        sc.stop();

    }
}

以上是关于spark-scala-java实现wordcount的主要内容,如果未能解决你的问题,请参考以下文章

hadoop集群提交代码

Storm常用操作命令及WordCount

2020/1/18寒假自学——学习进度报告4

使用shell脚本的Hadoop流:reducer因错误而失败:没有这样的文件或目录

spark2.x由浅入深深到底系列六之RDD 支持java8 lambda表达式

一些字符和字符串库函数操作模拟实现