Spark 之WordCount

Posted 逆风飞翔的小叔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 之WordCount相关的知识,希望对你有一定的参考价值。

前言

学习任何一门语言,都是从helloword开始,对于大数据框架来说,则是从wordcount开始,spark也不例外,作为一门大数据处理框架,在系统的学习spark之前,让我们先从一个wordcount开始吧!

环境准备说明

Spark Scala 语言开发的,所以本课件接下来的开发所使用的语言也为 Scala ,咱们当 前使用的 Spark 版本为 3.0.0 ,默认采用的 Scala 编译版本为 2.12 ,所以后续开发时。我们依 然采用这个版本。开发前请保证 IDEA 开发工具中含有 Scala 开发插件;

maven 中添加基础依赖

<dependencies>
 <dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-core_2.12</artifactId>
 <version>3.0.0</version>
 </dependency>
</dependencies>
在工程的resources目录下,准备如下的一个文件,里面有一行行单词,我们将在下面的程序中统计出这个文件中各个单词的数量;

 

三种方式实现 wordcount

方式一

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext

object HelloWord 

  def main(args: Array[String]): Unit = 

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
    // 创建 Spark 上下文环境对象(连接对象)
    val sc: SparkContext = new SparkContext(sparkConf);

    // 读取文件数据
    val lines: RDD[String] = sc.textFile("E:\\\\code-self\\\\spi\\\\spark-test\\\\src\\\\main\\\\resources\\\\data1.txt")

    //字符串的拆分,整体拆分成个体数据【数据扁平化处理】
    val words: RDD[String] = lines.flatMap(_.split(" "))

    // 转换数据结构 word => (word, 1)
    val wordGroup : RDD[(String,Iterable[String])] = words.groupBy(word => word)

    //对分组后的数据进行转换
    val wordCount = wordGroup.map 
      case (word, list) => 
        (word, list.size)
      
    

    //将转换的结果采集并输出到控制台
    val array : Array[(String,Int)] = wordCount.collect()

    array.foreach(println)

    //关闭 Spark 连接
    sc.stop()
  


运行上面的代码,控制台观察输出效果,

方式二

 

import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD

object WordCount2 

  def main(args: Array[String]): Unit = 

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
    // 创建 Spark 上下文环境对象(连接对象)
    val sc: SparkContext = new SparkContext(sparkConf);

    // 读取文件数据
    val lines: RDD[String] = sc.textFile("E:\\\\code-self\\\\spi\\\\spark-test\\\\src\\\\main\\\\resources\\\\data1.txt")

    //字符串的拆分,整体拆分成个体数据【数据扁平化处理】
    val words: RDD[String] = lines.flatMap(_.split(" "))

    val wordToOne = words.map(
      word => (word,1)
    )

    // 转换数据结构 word => (word, 1)
    val wordGroup = wordToOne.groupBy(
      t => t._1
    )

    //对分组后的数据进行转换
    val wordCount = wordGroup.map 
      case (word, list) => 

        list.reduce(
          (t1,t2) =>
            (t1._1,t1._2 + t2._2)
          
        )
      
    

    //将转换的结果采集并输出到控制台
    val array : Array[(String,Int)] = wordCount.collect()

    array.foreach(println)

    //关闭 Spark 连接
    sc.stop()
  


运行上面的代码,观察控制台输出效果

 

方式三

import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD

object WordCount3 

  def main(args: Array[String]): Unit = 

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
    // 创建 Spark 上下文环境对象(连接对象)
    val sc: SparkContext = new SparkContext(sparkConf);

    // 读取文件数据
    val lines: RDD[String] = sc.textFile("E:\\\\code-self\\\\spi\\\\spark-test\\\\src\\\\main\\\\resources\\\\data1.txt")

    //字符串的拆分,整体拆分成个体数据【数据扁平化处理】
    val words: RDD[String] = lines.flatMap(_.split(" "))

    val wordToOne = words.map(
      word => (word,1)
    )

    //相同的key的数据对value做聚合
    //val wordCount = wordToOne.reduceByKey((x,y) => x + y)
    //val wordCount = wordToOne.reduceByKey(_+_)
    val wordCount = wordToOne.reduceByKey((x,y) => x + y)

    //将转换的结果采集并输出到控制台
    val array : Array[(String,Int)] = wordCount.collect()

    array.foreach(println)

    //关闭 Spark 连接
    sc.stop()
  


运行上面的代码,观察控制台输出效果

 

以上三种方式,最终能够达到统计单词的数量的目的,其中前面两种采用了比较常规的方式,即先对数据进行扁平化,拆分成一个个的单词,然后再对单词进行分组聚合,最后得到统计结果; 而第三种方式,则是使用了spark的map-reduce思想,直接通过reduceByKey这个方法,一次性的完成了对数据的分组聚合操作,省区了前面的步骤,属于一种更加优化的实现思路;

以上是关于Spark 之WordCount的主要内容,如果未能解决你的问题,请参考以下文章

小记--------spark-Wordcount经典案例之对结果根据词频进行倒序排序

6.SparkStreaming之WordCount(UpdateStateByKey)

09高级编程之基于排序机制的wordcount程序

Spark任务提交与执行之RDD的创建转换及DAG构建

Spark任务提交与执行之RDD的创建转换及DAG构建

大数据之Spark:Spark Streaming