MapReduce排序
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce排序相关的知识,希望对你有一定的参考价值。
一:背景
数据排序是许多实际任务执行时需要完成的第一项工作,比如给销售额进行排名、求top N等操作都需要用到排序,使用MapReduce对数据进行简单排序思路是非常简单的。因为MapReduce本身就是支持排序的,MapReduce默认是对Key进行排序,我们可以将要排序的数据作为key进行输出就自动完成排序咯。
二:技术实现
#需求:现有如下数据,按从小到大进行排列
- 1
- 23
- 32
- 1
- 3
- 8
- 8
- 9
- 99
- 100
注:数据中有相同的数字,需要采用取巧的方法,保留相同的数据(见源代码)。
实现代码:
- public class SimpleSortMapReduce {
- // 定义输入输出路径
- private static final String INPATH = "hdfs://liaozhongmin21:8020/sortFiles/*";
- private static final String OUTPATH = "hdfs://liaozhongmin21:8020/out";
- public static void main(String[] args) {
- try {
- // 创建配置
- Configuration conf = new Configuration();
- // 创建FileSystem
- FileSystem fileSystem = FileSystem.get(new URI(OUTPATH), conf);
- // 判断输出文件是否存在,如果存在就进行删除
- if (fileSystem.exists(new Path(OUTPATH))) {
- fileSystem.delete(new Path(OUTPATH), true);
- }
- // 创建Job
- Job job = new Job(conf, SimpleSortMapReduce.class.getName());
- // 设置输入文件的输入格式
- job.setInputFormatClass(TextInputFormat.class);
- // 设置输入目录
- FileInputFormat.setInputPaths(job, new Path(INPATH));
- // 设置自定义Mapper
- job.setMapperClass(SimpleSortMapper.class);
- // 设置Mapper输出的Key和Value
- job.setMapOutputKeyClass(IntWritable.class);
- job.setMapOutputValueClass(Text.class);
- // 设置分区
- job.setPartitionerClass(HashPartitioner.class);
- // 设置Reducer的个数
- job.setNumReduceTasks(1);
- // 设置自定义的Reducer
- job.setReducerClass(SimpleSortReducer.class);
- // 设置输出的格式化类
- job.setOutputFormatClass(TextOutputFormat.class);
- // 设置输出目录
- FileOutputFormat.setOutputPath(job, new Path(OUTPATH));
- // 设置输出的key和value
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(IntWritable.class);
- // 提交任务
- System.exit(job.waitForCompletion(true) ? 1 : 0);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public static class SimpleSortMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
- // 定义输出的key和value
- private IntWritable outKey = new IntWritable();
- private Text outValue = new Text("");
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context) throws IOException,
- InterruptedException {
- // 获取行文本内容
- String line = value.toString();
- if (!(line == null || "".equals(line))) {
- // 设置输出的key
- outKey.set(Integer.parseInt(line));
- // 把结果写出去(把真实值作为key,value设置为空,到达reduce后,如:<1,{"","",""}>,可以通过遍历values的方法避免相同的数字被去重)
- context.write(outKey, outValue);
- }
- }
- }
- public static class SimpleSortReducer extends Reducer<IntWritable, Text, IntWritable, IntWritable> {
- // 定义输出的key
- private IntWritable outKey = new IntWritable(1);
- @Override
- protected void reduce(IntWritable key, Iterable<Text> values, Reducer<IntWritable, Text, IntWritable, IntWritable>.Context context) throws IOException,
- InterruptedException {
- // 遍历集合把结果写出去(这里主要是为了把相同的数字也遍历出来,遍历只是为了防止相同的数字遗漏)
- for (Text val : values) {
- context.write(outKey, key);
- // key值自加1
- outKey.set(outKey.get() + 1);
- }
- }
- }
- }
程序运行结果:
以上是关于MapReduce排序的主要内容,如果未能解决你的问题,请参考以下文章