小记--------spark-Wordcount经典案例之对结果根据词频进行倒序排序
Posted yzqyxq
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了小记--------spark-Wordcount经典案例之对结果根据词频进行倒序排序相关的知识,希望对你有一定的参考价值。
还是以经典案例Wordcount为例:
逻辑思路:
1.先把文本按空格切分成每个单词 flatMap()
2.将每个单词都转换成Tuple2类型(hello ,1) map()
3.将key相同的次数相加(hello , 5) reduceByKey()
4.将(hello , 5) 反转成(5 , hello) map()
5.将反转好的RDD 通过key排序 sortByKey()
6.将排序好的RDD 再反转成(hello , 5) map()
7.打印 foreach()
scala版本
/** * sortwordcount 案例 * 通过单词出现的次数 倒序排序 */ object SortWordCountScala { def main(args: Array[String]): Unit = { //创建SparkContext sparkconf val conf = new SparkConf() .setAppName("sortWordCount") .setMaster("local") val sc = new SparkContext(conf) //读取文件 val lines = sc.textFile("D:\daima\work\1011\spark-test-zhonghuashishan\file\data_syn.txt") //原始Wordcount案例 val split = lines.flatMap(line => line.split(" ")) val map = split.map(m => (m ,1)) val reduce = map.reduceByKey(_ + _ ) //将(you ,2 ) (hello ,3) 反转成(2 , you) (3 , hello) val fanzhuan = reduce.map(r => (r._2 , r._1)) //通过key排序 sortByKey 参数为false :倒序(3 , hello) (2 , you) val sort = fanzhuan.sortByKey(false) //在将反转之后排序好的rdd在反转成:(hello ,3) (you , 2) val put = sort.map(m => (m._2 , m._1)) //打印 put.foreach(p => println(p._1 +"单词"+p._2+"个数")) } }
Java版本
/** * 排序的Wordcount程序 * 通过单词出现的次数 倒序排序 */ public class SortWordCount { public static void main (String [] args){ //创建sparkconf 和JavaSparkContext SparkConf conf = new SparkConf() .setAppName("SortWordCount") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); //创建linesrdd JavaRDD<String> linesRDD = sc.textFile("D:\daima\work\1011\spark-test-zhonghuashishan\file\data_syn.txt"); //执行之前我们做过的单词计数 JavaRDD<String> stringJavaRDD = linesRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); //给每个单词都拼成 (q,1) JavaPairRDD<String, Integer> stringIntegerJavaPairRDD = stringJavaRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); //结果为 (yu , 3) (hello ,2) JavaPairRDD<String, Integer> stringIntegerJavaPairRDD1 = stringIntegerJavaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); //通过new Tuple2 将(yu , 3) (hello ,2) 反转成 (3 , yu) (2 , hello) JavaPairRDD<Integer, String> integerStringJavaPairRDD = stringIntegerJavaPairRDD1.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return new Tuple2<>(stringIntegerTuple2._2, stringIntegerTuple2._1); } }); //按照key进行排序 倒序 (3 , yu) (2 , hello) JavaPairRDD<Integer, String> integerStringJavaPairRDD1 = integerStringJavaPairRDD.sortByKey(false); //将排序好的(3 , yu) (2 , hello) 反转成 (yu , 3) (hello ,2) JavaPairRDD<String, Integer> stringIntegerJavaPairRDD2 = integerStringJavaPairRDD1.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception { return new Tuple2<String, Integer>(integerStringTuple2._2, integerStringTuple2._1); } }); //打印 stringIntegerJavaPairRDD2.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { System.out.println(stringIntegerTuple2._1 + "单词"+ stringIntegerTuple2._2 +"个数"); } }); //关闭JavaSparkContext sc.close(); } }
以上是关于小记--------spark-Wordcount经典案例之对结果根据词频进行倒序排序的主要内容,如果未能解决你的问题,请参考以下文章