MapReduce排序

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce排序相关的知识,希望对你有一定的参考价值。

一:背景

数据排序是许多实际任务执行时需要完成的第一项工作,比如给销售额进行排名、求top N等操作都需要用到排序,使用MapReduce对数据进行简单排序思路是非常简单的。因为MapReduce本身就是支持排序的,MapReduce默认是对Key进行排序,我们可以将要排序的数据作为key进行输出就自动完成排序咯。

 

二:技术实现

#需求:现有如下数据,按从小到大进行排列

 

[html] view plain copy
 
  1. 1  
  2. 23  
  3. 32  
  4. 1  
  5. 3  
  6. 8  
  7. 8  
  8. 9  
  9. 99  
  10. 100  

 

注:数据中有相同的数字,需要采用取巧的方法,保留相同的数据(见源代码)。

 

实现代码:

 

[html] view plain copy
 
  1. public class SimpleSortMapReduce {  
  2.     // 定义输入输出路径  
  3.     private static final String INPATH = "hdfs://liaozhongmin21:8020/sortFiles/*";  
  4.     private static final String OUTPATH = "hdfs://liaozhongmin21:8020/out";  
  5.   
  6.     public static void main(String[] args) {  
  7.         try {  
  8.             // 创建配置  
  9.             Configuration conf = new Configuration();  
  10.   
  11.             // 创建FileSystem  
  12.             FileSystem fileSystem = FileSystem.get(new URI(OUTPATH), conf);  
  13.             // 判断输出文件是否存在,如果存在就进行删除  
  14.             if (fileSystem.exists(new Path(OUTPATH))) {  
  15.                 fileSystem.delete(new Path(OUTPATH), true);  
  16.             }  
  17.   
  18.             // 创建Job  
  19.             Job job = new Job(conf, SimpleSortMapReduce.class.getName());  
  20.   
  21.             // 设置输入文件的输入格式  
  22.             job.setInputFormatClass(TextInputFormat.class);  
  23.             // 设置输入目录  
  24.             FileInputFormat.setInputPaths(job, new Path(INPATH));  
  25.   
  26.             // 设置自定义Mapper  
  27.             job.setMapperClass(SimpleSortMapper.class);  
  28.   
  29.             // 设置Mapper输出的Key和Value  
  30.             job.setMapOutputKeyClass(IntWritable.class);  
  31.             job.setMapOutputValueClass(Text.class);  
  32.   
  33.             // 设置分区  
  34.             job.setPartitionerClass(HashPartitioner.class);  
  35.             // 设置Reducer的个数  
  36.             job.setNumReduceTasks(1);  
  37.   
  38.             // 设置自定义的Reducer  
  39.             job.setReducerClass(SimpleSortReducer.class);  
  40.   
  41.             // 设置输出的格式化类  
  42.             job.setOutputFormatClass(TextOutputFormat.class);  
  43.             // 设置输出目录  
  44.             FileOutputFormat.setOutputPath(job, new Path(OUTPATH));  
  45.   
  46.             // 设置输出的key和value  
  47.             job.setOutputKeyClass(IntWritable.class);  
  48.             job.setOutputValueClass(IntWritable.class);  
  49.   
  50.             // 提交任务  
  51.             System.exit(job.waitForCompletion(true) ? 1 : 0);  
  52.   
  53.         } catch (Exception e) {  
  54.             e.printStackTrace();  
  55.         }  
  56.     }  
  57.   
  58.     public static class SimpleSortMapper extends Mapper<LongWritable, Text, IntWritable, Text> {  
  59.         // 定义输出的key和value  
  60.         private IntWritable outKey = new IntWritable();  
  61.         private Text outValue = new Text("");  
  62.   
  63.         @Override  
  64.         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context) throws IOException,  
  65.                 InterruptedException {  
  66.             // 获取行文本内容  
  67.             String line = value.toString();  
  68.             if (!(line == null || "".equals(line))) {  
  69.                 // 设置输出的key  
  70.                 outKey.set(Integer.parseInt(line));  
  71.                 // 把结果写出去(把真实值作为key,value设置为空,到达reduce后,如:<1,{"","",""}>,可以通过遍历values的方法避免相同的数字被去重)  
  72.                 context.write(outKey, outValue);  
  73.             }  
  74.   
  75.         }  
  76.     }  
  77.   
  78.     public static class SimpleSortReducer extends Reducer<IntWritable, Text, IntWritable, IntWritable> {  
  79.         // 定义输出的key  
  80.         private IntWritable outKey = new IntWritable(1);  
  81.   
  82.         @Override  
  83.         protected void reduce(IntWritable key, Iterable<Text> values, Reducer<IntWritable, Text, IntWritable, IntWritable>.Context context) throws IOException,  
  84.                 InterruptedException {  
  85.             // 遍历集合把结果写出去(这里主要是为了把相同的数字也遍历出来,遍历只是为了防止相同的数字遗漏)  
  86.             for (Text val : values) {  
  87.                 context.write(outKey, key);  
  88.                 // key值自加1  
  89.                 outKey.set(outKey.get() + 1);  
  90.             }  
  91.         }  
  92.     }  
  93. }  


程序运行结果:

 

技术分享

以上是关于MapReduce排序的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce二次排序

MapReduce实现手机上网日志分析(排序)

MapReduce TopK问题实际应用

mongoDB统计数据--mapReduce实现

MapReduce二次排序

MapReduce的WritableComparable 排序