Spark 之WordCount
Posted 逆风飞翔的小叔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 之WordCount相关的知识,希望对你有一定的参考价值。
前言
学习任何一门语言,都是从helloword开始,对于大数据框架来说,则是从wordcount开始,spark也不例外,作为一门大数据处理框架,在系统的学习spark之前,让我们先从一个wordcount开始吧!
环境准备说明
Spark 由 Scala 语言开发的,所以本课件接下来的开发所使用的语言也为 Scala ,咱们当 前使用的 Spark 版本为 3.0.0 ,默认采用的 Scala 编译版本为 2.12 ,所以后续开发时。我们依 然采用这个版本。开发前请保证 IDEA 开发工具中含有 Scala 开发插件;
maven 中添加基础依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
在工程的resources目录下,准备如下的一个文件,里面有一行行单词,我们将在下面的程序中统计出这个文件中各个单词的数量;
三种方式实现 wordcount
方式一
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext
object HelloWord
def main(args: Array[String]): Unit =
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf);
// 读取文件数据
val lines: RDD[String] = sc.textFile("E:\\\\code-self\\\\spi\\\\spark-test\\\\src\\\\main\\\\resources\\\\data1.txt")
//字符串的拆分,整体拆分成个体数据【数据扁平化处理】
val words: RDD[String] = lines.flatMap(_.split(" "))
// 转换数据结构 word => (word, 1)
val wordGroup : RDD[(String,Iterable[String])] = words.groupBy(word => word)
//对分组后的数据进行转换
val wordCount = wordGroup.map
case (word, list) =>
(word, list.size)
//将转换的结果采集并输出到控制台
val array : Array[(String,Int)] = wordCount.collect()
array.foreach(println)
//关闭 Spark 连接
sc.stop()
运行上面的代码,控制台观察输出效果,
方式二
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD
object WordCount2
def main(args: Array[String]): Unit =
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf);
// 读取文件数据
val lines: RDD[String] = sc.textFile("E:\\\\code-self\\\\spi\\\\spark-test\\\\src\\\\main\\\\resources\\\\data1.txt")
//字符串的拆分,整体拆分成个体数据【数据扁平化处理】
val words: RDD[String] = lines.flatMap(_.split(" "))
val wordToOne = words.map(
word => (word,1)
)
// 转换数据结构 word => (word, 1)
val wordGroup = wordToOne.groupBy(
t => t._1
)
//对分组后的数据进行转换
val wordCount = wordGroup.map
case (word, list) =>
list.reduce(
(t1,t2) =>
(t1._1,t1._2 + t2._2)
)
//将转换的结果采集并输出到控制台
val array : Array[(String,Int)] = wordCount.collect()
array.foreach(println)
//关闭 Spark 连接
sc.stop()
运行上面的代码,观察控制台输出效果
方式三
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD
object WordCount3
def main(args: Array[String]): Unit =
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount");
// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf);
// 读取文件数据
val lines: RDD[String] = sc.textFile("E:\\\\code-self\\\\spi\\\\spark-test\\\\src\\\\main\\\\resources\\\\data1.txt")
//字符串的拆分,整体拆分成个体数据【数据扁平化处理】
val words: RDD[String] = lines.flatMap(_.split(" "))
val wordToOne = words.map(
word => (word,1)
)
//相同的key的数据对value做聚合
//val wordCount = wordToOne.reduceByKey((x,y) => x + y)
//val wordCount = wordToOne.reduceByKey(_+_)
val wordCount = wordToOne.reduceByKey((x,y) => x + y)
//将转换的结果采集并输出到控制台
val array : Array[(String,Int)] = wordCount.collect()
array.foreach(println)
//关闭 Spark 连接
sc.stop()
运行上面的代码,观察控制台输出效果
以上三种方式,最终能够达到统计单词的数量的目的,其中前面两种采用了比较常规的方式,即先对数据进行扁平化,拆分成一个个的单词,然后再对单词进行分组聚合,最后得到统计结果; 而第三种方式,则是使用了spark的map-reduce思想,直接通过reduceByKey这个方法,一次性的完成了对数据的分组聚合操作,省区了前面的步骤,属于一种更加优化的实现思路;
以上是关于Spark 之WordCount的主要内容,如果未能解决你的问题,请参考以下文章
小记--------spark-Wordcount经典案例之对结果根据词频进行倒序排序