spark 分组取topn
Posted tele-share
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 分组取topn相关的知识,希望对你有一定的参考价值。
java
1 /**
2 *分组取topn,有序数列去除一些项后,仍然有序,所以应当先排序后分组
3 *@author Tele
4 *
5 */
6 public class TopDemo2 {
7 private static SparkConf conf = new SparkConf().setMaster("local").setAppName("topdemo2");
8 private static JavaSparkContext jsc = new JavaSparkContext(conf);
9
10 public static <U> void main(String[] args) {
11 JavaRDD<String> rdd = jsc.textFile("./src/main/java/base_demo/top/score.txt");
12
13 JavaPairRDD<Integer, String> mapToPair = rdd.mapToPair(new PairFunction<String, Integer, String>() {
14
15 private static final long serialVersionUID = 1L;
16
17 @Override
18 public Tuple2<Integer, String> call(String t) throws Exception {
19 String[] fields = t.split(" ");
20 return new Tuple2<Integer, String>(Integer.parseInt(fields[1]), fields[0]);
21 }
22 });
23
24 // 先排序
25 JavaPairRDD<Integer, String> sortByKey = mapToPair.sortByKey(false);
26
27 // 互换位置以便分组
28 JavaPairRDD<String, Integer> mapToPair2 = sortByKey
29 .mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
30
31 private static final long serialVersionUID = 1L;
32
33 @Override
34 public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
35 return new Tuple2<String, Integer>(t._2, t._1);
36 }
37 });
38
39 // 分组
40 JavaPairRDD<String, Iterable<Integer>> groupByKey2 = mapToPair2.groupByKey();
41
42 // 取前三
43 JavaPairRDD<String, Iterable<Integer>> result = groupByKey2
44 .mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() {
45
46 private static final long serialVersionUID = 1L;
47
48 @Override
49 public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> t)
50 throws Exception {
51
52 return new Tuple2<String, Iterable<Integer>>(t._1,
53 IteratorUtils.toList(t._2.iterator()).subList(0, 3));
54 }
55 });
56
57 result.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
58
59 private static final long serialVersionUID = 1L;
60
61 @Override
62 public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
63 System.out.println(t._1 + t._2);
64 }
65 });
66
67 jsc.close();
68 }
69 }
scala
1 object TopDemo2 {
2 def main(args: Array[String]): Unit = {
3 val conf = new SparkConf().setMaster("local").setAppName("topdemo2");
4 val sc = new SparkContext(conf);
5
6 val rdd = sc.textFile("./src/main/scala/spark_core/top/score.txt", 1);
7
8 rdd.map(lines => {
9 val fields = lines.split(" ");
10 (fields(1).toInt, fields(0));
11 }).sortByKey(false, 1).map(t => (t._2, t._1)).groupByKey().map(t => {
12 val arr = t._2;
13 val score = arr.take(3);
14 (t._1, score)
15 }).foreach(t => println(t._1 + "---" + t._2));
16 }
17 }
以上是关于spark 分组取topn的主要内容,如果未能解决你的问题,请参考以下文章