使用 MapReduce 实现分组排名

Posted cauwt

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用 MapReduce 实现分组排名相关的知识,希望对你有一定的参考价值。

题目:

1.输出男女组各前3名。

2.分组降序输出所有人的成绩。

3.对应的SQL语句。

数据:

scores: name,age,gender,score

解答如下:

1. 输出男女组各前3名。

Java代码

 1 class ContestStatTop3Mapper extends Mapper<Object, Text, Text, Text> {
 2 
 3   private Text outKey = new Text();
 4   private Text outValue = new Text();
 5 
 6   @Override
 7   public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 8     // name, age, gender, score
 9     String[] arr = value.toString().split(",");
10     outKey.set(arr[2]); // gender
11     outValue.set(arr[0] + "," + arr[1] + "," + arr[3]); // name,age,score
12     context.write(outKey, outValue);
13   }
14 }
15 
16 class ContestStatTop3Reducer extends Reducer<Text, Text, Text, Text> {
17 
18 
19   private Text outKey = new Text();
20   private Text outValue = new Text();
21 
22   private int maxSize = 3;
23 
24   private List<ContestPerson> list = new ArrayList<>(maxSize);
25 
26   @Override
27   public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
28     String gender = key.toString();
29     int pos =0;
30     int minScore = Integer.MAX_VALUE;
31     list.clear();
32     for (Text t : values) {
33       String[] arr = t.toString().split(",");
34       // name,age,score
35       String name = arr[0];
36       int age = Integer.parseInt(arr[1]);
37       int score = Integer.parseInt(arr[2]);
38       ContestPerson person = new ContestPerson(name,age,gender,score);
39       if (list.size() < maxSize) { //always add if the size is less than maxSize
40         list.add(person);
41         if (score <= minScore) { // update minScore
42           minScore = score;
43           pos = list.size() - 1;// remember the position of person with minScore
44         }
45       } else {
46         if(score > minScore){
47         // remove the person with minScore
48           list.remove(pos);
49         // add the new person
50           list.add(person);
51         // update minScore
52           minScore = person.score;
53           pos = list.size() -1;
54           for(int i = 0; i< list.size();i++){
55             if(minScore > list.get(i).score){
56               minScore = list.get(i).score;
57               pos =i;
58             }
59           }
60         }
61       }
62     }
63     //output
64     outKey.set(gender);
65     for(ContestPerson person: list){
66       outValue.set(person.toString());
67       context.write(outKey, outValue);
68 
69     }
70   }
71 }

SQL 代码

 1 # option 1
 2 select m.name,m.age,m.gender,m.score from scores as m
 3 where 3>= (
 4 select count(*) from scores as s where m.gender = s.gender and m.score <= s.score
 5 )
 6 order by m.gender,m.score desc,m.age,m.name;
 7 
 8 # option 2
 9 select m.name,m.age,m.gender,m.score from scores as m
10 left join scores as s on m.gender = s.gender and m.score <=  s.score
11 group by m.name,m.age,m.gender,m.score
12 having count(*) <=3
13 order by m.gender,m.score desc,m.age,m.name;

 

2.分组降序输出所有人的成绩。

Java代码

 1 class ContestStatSortMapper extends Mapper<Object, Text, Text, Text> {
 2 
 3   private Text outKey = new Text();
 4   private Text outValue = new Text();
 5 
 6   @Override
 7   public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 8     // name, age, gender, score
 9     String[] arr = value.toString().split(",");
10     outKey.set(arr[2]); // gender
11     outValue.set(arr[0] + "," + arr[1] + "," + arr[3]); // name,age,score
12     context.write(outKey, outValue);
13   }
14 }
15 
16 class ContestStatSortReducer extends Reducer<Text, Text, Text, Text> {
17 
18 
19   private Text outKey = new Text();
20   private Text outValue = new Text();
21 
22   private int maxSize = 3;
23 
24   private List<ContestPerson> list = new ArrayList<>(maxSize);
25 
26   @Override
27   public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
28     String gender = key.toString();
29     list.clear();
30     for (Text t : values) {
31       String[] arr = t.toString().split(",");
32       // name,age,score
33       String name = arr[0];
34       int age = Integer.parseInt(arr[1]);
35       int score = Integer.parseInt(arr[2]);
36       ContestPerson person = new ContestPerson(name,age,gender,score);
37       list.add(person);
38     }
39     //sort by score desc, then by age asc, then by name asc
40     Collections.sort(list, new Comparator<ContestPerson>() {
41       @Override
42       public int compare(ContestPerson o1, ContestPerson o2) {
43         if (o1.score > o2.score) {
44           return -1;
45         } else if (o1.score < o2.score) {
46           return 1;
47         } else {
48           if (o1.age > o2.age) {
49             return 1;
50           } else if (o1.age < o2.age) {
51             return -1;
52           } else {
53             return o1.name.compareTo(o2.name);
54           }
55         }
56       }
57     });
58     //output
59     outKey.set(gender);
60     for(ContestPerson person: list){
61       outValue.set(person.toString());
62       context.write(outKey, outValue);
63     }
64   }
65 }

SQL代码

1 select m.name,m.age,m.gender,m.score from scores as m
2 order by m.gender,m.score desc,m.age,m.name; 

附:Java代码中所使用的ContestPerson类

 1 class ContestPerson extends  Object{
 2   public String name;
 3   public int age;
 4   public String gender;
 5   public int score;
 6   public ContestPerson(String name, int age, String gender,int score){
 7     this.name = name;
 8     this.age = age;
 9     this.gender = gender;
10     this.score = score;
11   }
12   public String toString(){
13     return this.name + ","+ this.age + "," + this.gender + "," + this.score;
14   }
15 }

 

以上是关于使用 MapReduce 实现分组排名的主要内容,如果未能解决你的问题,请参考以下文章

mapreduce 的二次排序

2018-08-09期 MapReduce实现对单个用户支付金额最大的前N个商品排名

大数据之Hadoop(MapReduce):GroupingComparator分组案例实操

mysql5.7使用变量进行分组排名并筛选

mysql5.7使用变量进行分组排名并筛选

一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现