Spark的word count
Posted xiashiwendao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark的word count相关的知识,希望对你有一定的参考价值。
word count
1 package com.spark.app 2 3 import org.apache.spark.{SparkContext, SparkConf} 4 5 /** 6 * Created by Administrator on 2016/7/24 0024. 7 */ 8 object WordCount { 9 def main(args: Array[String]) { 10 /** 11 * 第1步;创建Spark的配置对象SparkConf,设置Spark程序运行时的配置信息 12 * 例如 setAppName用来设置应用程序的名称,在程序运行的监控界面可以看到该名称, 13 * setMaster设置程序运行在本地还是运行在集群中,运行在本地可是使用local参数,也可以使用local[K]/local[*], 14 * 可以去spark官网查看它们不同的意义。 如果要运行在集群中,以Standalone模式运行的话,需要使用spark://HOST:PORT 15 * 的形式指定master的IP和端口号,默认是7077 16 */ 17 val conf = new SparkConf().setAppName("WordCount").setMaster("local") 18 // val conf = new SparkConf().setAppName("WordCount").setMaster("spark://master:7077") // 运行在集群中 19 20 /** 21 * 第2步:创建SparkContext 对象 22 * SparkContext是Spark程序所有功能的唯一入口 23 * SparkContext核心作用: 初始化Spark应用程序运行所需要的核心组件,包括DAGScheduler、TaskScheduler、SchedulerBackend 24 * 同时还会负责Spark程序往Master注册程序 25 * 26 * 通过传入SparkConf实例来定制Spark运行的具体参数和配置信息 27 */ 28 val sc = new SparkContext(conf) 29 30 /** 31 * 第3步: 根据具体的数据来源(HDFS、 HBase、Local FS、DB、 S3等)通过SparkContext来创建RDD 32 * RDD 的创建基本有三种方式: 根据外部的数据来源(例如HDFS)、根据Scala集合使用SparkContext的parallelize方法、 33 * 由其他的RDD操作产生 34 * 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴 35 */ 36 37 val lines = sc.textFile("D:/resources/README.md") // 读取本地文件 38 // val lines = sc.textFile("/library/wordcount/input") // 读取HDFS文件,并切分成不同的Partition 39 // val lines = sc.textFile("hdfs://master:9000/libarary/wordcount/input") // 或者明确指明是从HDFS上获取数据 40 41 /** 42 * 第4步: 对初始的RDD进行Transformation级别的处理,例如 map、filter等高阶函数来进行具体的数据计算 43 */ 44 val words = lines.flatMap(_.split(" ")).filter(word => word != " ") // 拆分单词,并过滤掉空格,当然还可以继续进行过滤,如去掉标点符号 45 46 val pairs = words.map(word => (word, 1)) // 在单词拆分的基础上对每个单词实例计数为1, 也就是 word => (word, 1) 47 48 val wordscount = pairs.reduceByKey(_ + _) // 在每个单词实例计数为1的基础之上统计每个单词在文件中出现的总次数, 即key相同的value相加 49 // val wordscount = pairs.reduceByKey((v1, v2) => v1 + v2) // 等同于 50 51 wordscount.collect.foreach(println) // 打印结果,使用collect会将集群中的数据收集到当前运行drive的机器上,需要保证单台机器能放得下所有数据 52 53 sc.stop() // 释放资源 54 55 } 56 }
注意spark的套路:
1. 创建配置配置,创建sparkcontext;
2. 获取数据源;
3. flatmap进行元素独立;
4. filter进行过滤;
5. map封装为元组;
6. reduce进行计数;
按照数量排序
1 package com.spark.app 2 3 import org.apache.spark.{SparkContext, SparkConf} 4 5 /** 6 * Created by Administrator on 2016/7/24 0024. 7 */ 8 object WordCountSorted { 9 def main(args: Array[String]) { 10 def conf = new SparkConf().setAppName("WordCountSorted").setMaster("local") 11 def sc = new SparkContext(conf) 12 13 val lines = sc.textFile("D:/resources/README.md") 14 val words = lines.flatMap(_.split(" ")).filter(word => word != " ") 15 val pairs = words.map(word => (word, 1)) 16 17 /** 18 * 在这里通过reduceByKey方法之后可以获得每个单词出现的次数 19 * 第一个map将单词和出现的次数交换,将出现的次数作为key,使用sortByKey进行排序(false为降序) 20 * 第二个map将出现的次数和单词交换,这样还是恢复到以单词作为key 21 */ 22 val wordcount = pairs.reduceByKey(_ + _).map(pair => (pair._2, pair._1)).sortByKey(false).map(pair => (pair._2, pair._1)) 23 wordcount.collect.foreach(println) 24 25 sc.stop() 26 } 27 }
你可以采用一条龙的方式来进行上述实现,感觉那是一个畅快!
以上是关于Spark的word count的主要内容,如果未能解决你的问题,请参考以下文章
Spark Streaming和Flink的Word Count对比
Learning Spark——使用spark-shell运行Word Count