使用 MapReduce 实现分组排名
Posted cauwt
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用 MapReduce 实现分组排名相关的知识,希望对你有一定的参考价值。
题目:
1.输出男女组各前3名。
2.分组降序输出所有人的成绩。
3.对应的SQL语句。
数据:
scores: name,age,gender,score
解答如下:
1. 输出男女组各前3名。
Java代码
1 class ContestStatTop3Mapper extends Mapper<Object, Text, Text, Text> { 2 3 private Text outKey = new Text(); 4 private Text outValue = new Text(); 5 6 @Override 7 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 8 // name, age, gender, score 9 String[] arr = value.toString().split(","); 10 outKey.set(arr[2]); // gender 11 outValue.set(arr[0] + "," + arr[1] + "," + arr[3]); // name,age,score 12 context.write(outKey, outValue); 13 } 14 } 15 16 class ContestStatTop3Reducer extends Reducer<Text, Text, Text, Text> { 17 18 19 private Text outKey = new Text(); 20 private Text outValue = new Text(); 21 22 private int maxSize = 3; 23 24 private List<ContestPerson> list = new ArrayList<>(maxSize); 25 26 @Override 27 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 28 String gender = key.toString(); 29 int pos =0; 30 int minScore = Integer.MAX_VALUE; 31 list.clear(); 32 for (Text t : values) { 33 String[] arr = t.toString().split(","); 34 // name,age,score 35 String name = arr[0]; 36 int age = Integer.parseInt(arr[1]); 37 int score = Integer.parseInt(arr[2]); 38 ContestPerson person = new ContestPerson(name,age,gender,score); 39 if (list.size() < maxSize) { //always add if the size is less than maxSize 40 list.add(person); 41 if (score <= minScore) { // update minScore 42 minScore = score; 43 pos = list.size() - 1;// remember the position of person with minScore 44 } 45 } else { 46 if(score > minScore){ 47 // remove the person with minScore 48 list.remove(pos); 49 // add the new person 50 list.add(person); 51 // update minScore 52 minScore = person.score; 53 pos = list.size() -1; 54 for(int i = 0; i< list.size();i++){ 55 if(minScore > list.get(i).score){ 56 minScore = list.get(i).score; 57 pos =i; 58 } 59 } 60 } 61 } 62 } 63 //output 64 outKey.set(gender); 65 for(ContestPerson person: list){ 66 outValue.set(person.toString()); 67 context.write(outKey, outValue); 68 69 } 70 } 71 }
SQL 代码
1 # option 1 2 select m.name,m.age,m.gender,m.score from scores as m 3 where 3>= ( 4 select count(*) from scores as s where m.gender = s.gender and m.score <= s.score 5 ) 6 order by m.gender,m.score desc,m.age,m.name; 7 8 # option 2 9 select m.name,m.age,m.gender,m.score from scores as m 10 left join scores as s on m.gender = s.gender and m.score <= s.score 11 group by m.name,m.age,m.gender,m.score 12 having count(*) <=3 13 order by m.gender,m.score desc,m.age,m.name;
2.分组降序输出所有人的成绩。
Java代码
1 class ContestStatSortMapper extends Mapper<Object, Text, Text, Text> { 2 3 private Text outKey = new Text(); 4 private Text outValue = new Text(); 5 6 @Override 7 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 8 // name, age, gender, score 9 String[] arr = value.toString().split(","); 10 outKey.set(arr[2]); // gender 11 outValue.set(arr[0] + "," + arr[1] + "," + arr[3]); // name,age,score 12 context.write(outKey, outValue); 13 } 14 } 15 16 class ContestStatSortReducer extends Reducer<Text, Text, Text, Text> { 17 18 19 private Text outKey = new Text(); 20 private Text outValue = new Text(); 21 22 private int maxSize = 3; 23 24 private List<ContestPerson> list = new ArrayList<>(maxSize); 25 26 @Override 27 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 28 String gender = key.toString(); 29 list.clear(); 30 for (Text t : values) { 31 String[] arr = t.toString().split(","); 32 // name,age,score 33 String name = arr[0]; 34 int age = Integer.parseInt(arr[1]); 35 int score = Integer.parseInt(arr[2]); 36 ContestPerson person = new ContestPerson(name,age,gender,score); 37 list.add(person); 38 } 39 //sort by score desc, then by age asc, then by name asc 40 Collections.sort(list, new Comparator<ContestPerson>() { 41 @Override 42 public int compare(ContestPerson o1, ContestPerson o2) { 43 if (o1.score > o2.score) { 44 return -1; 45 } else if (o1.score < o2.score) { 46 return 1; 47 } else { 48 if (o1.age > o2.age) { 49 return 1; 50 } else if (o1.age < o2.age) { 51 return -1; 52 } else { 53 return o1.name.compareTo(o2.name); 54 } 55 } 56 } 57 }); 58 //output 59 outKey.set(gender); 60 for(ContestPerson person: list){ 61 outValue.set(person.toString()); 62 context.write(outKey, outValue); 63 } 64 } 65 }
SQL代码
1 select m.name,m.age,m.gender,m.score from scores as m 2 order by m.gender,m.score desc,m.age,m.name;
附:Java代码中所使用的ContestPerson类
1 class ContestPerson extends Object{ 2 public String name; 3 public int age; 4 public String gender; 5 public int score; 6 public ContestPerson(String name, int age, String gender,int score){ 7 this.name = name; 8 this.age = age; 9 this.gender = gender; 10 this.score = score; 11 } 12 public String toString(){ 13 return this.name + ","+ this.age + "," + this.gender + "," + this.score; 14 } 15 }
以上是关于使用 MapReduce 实现分组排名的主要内容,如果未能解决你的问题,请参考以下文章
2018-08-09期 MapReduce实现对单个用户支付金额最大的前N个商品排名