MR实现全排序
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MR实现全排序相关的知识,希望对你有一定的参考价值。
参考技术A默认情况下,mr只对key排序。我们所说的全排序,即对key的全排序。
这个是最容易想到的思路
优点是实现简单,
缺点也很明显,一个reduce有可能比较慢,无法利用分布式的优点
通过重写Partition类,把key在一个范围内的发往一个固定的reduce,这样在一个reduce内key是全排序的,在reduce之间按照序号也是排好序的。比如key代表的是一个年龄。我们可以把数据输出到10个reduer。1-10岁之间发往第0个reduce,11-20发往第2个reduce,以此类推。
缺点 是这种划分可能不均匀。
所以全排序的大概思路为:确保Partition之间是有序的就OK了,即保证Partition1的最大值小于Partition2的最小值就OK了。
即便这样做也还是有个问题:Partition的分布不均,可能导致某些Partition处理的数据量远大于其他Partition处理的数据量。
而实现全排序的核心步骤为:取样和Partition。
先“取样”,保证Partition得更均匀:
我们知道Mapreduce框架在feed数据给reducer之前会对map output key排序,这种排序机制保证了每一个reducer局部有序,hadoop 默认的partitioner是HashPartitioner,它依赖于output key的hashcode,使得相同key会去相同reducer,但是不保证全局有序,如果想要获得全局排序结果(比如获取top N, bottom N),就需要用到TotalOrderPartitioner了,它保证了相同key去相同reducer的同时也保证了全局有序。
TotalOrderPartitioner依赖于一个partition file来distribute keys,partition file是一个实现计算好的sequence file,如果我们设置的reducer number是N,那么这个文件包含(N-1)个key分割点,并且是基于key comparator排好序的。TotalOrderPartitioner会检查每一个key属于哪一个reducer的范围内,然后决定分发给哪一个reducer。
InputSampler类的writePartitionFile方法会对input files取样并创建partition file。有三种取样方法:
参考: MapReduce 实现全排序的方式
十二道MR习题 – 1 – 排序
题目:
一个文件,大小约为100G。文件的每一行都是一个数字,要求对文件中的所有数字进行排序。
对于这个题目,了解过Hadoop的同学可以笑而不语了。即使用spark实现也是非常简单的事情。
先说下如何用Hadoop实现。实际上也没什么好说的:Map任务逐行读入数字,而后在Reduce中输出就可以了,简单粗暴到令人发指。
看下代码好了:
package com.zhyea.dev; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class NumberSort { public static class SplitterMapper extends Mapper<Object, Text, IntWritable, IntWritable> { private static final IntWritable intWritable = new IntWritable(); @Override public void map(Object key, Text value, Context context) { try { int num = Integer.valueOf(value.toString()); intWritable.set(num); context.write(intWritable, intWritable); } catch (Exception e) { e.printStackTrace(); } } } public static class IntegrateReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { @Override public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) { try { context.write(key, key); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "number-sort"); job.setJarByClass(NumberSort.class); job.setMapperClass(SplitterMapper.class); job.setReducerClass(IntegrateReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
在map方法中,输出值的Value部分我选择了一个IntWritable的值。Value值的类型也是可以设置为NullWritable的,不过这样map任务执行起来会比较慢,虽然reduce任务执行的会快一些,但是终究是得不偿失。
在我们的程序里没有执行任何排序的动作,但是输出的结果是有序的,这是因为在shuffle阶段已经完成了排序(一次快速排序,一次归并排序)。
再来看看用spark是如何完成的:
object NumSortJob { def main(args: Array[String]): Unit = { val inputPath = args(0) val outputPath = args(1) val conf = new SparkConf().setAppName("Num Sort") val sc = new SparkContext(conf) val data = sc.hadoopFile[LongWritable, Text, TextInputFormat](inputPath) data.map(p => p._2.toString.toInt).distinct().sortBy[Int](p => p).coalesce(1, true).saveAsTextFile(outputPath) } }
spark则需要主动进行排序。即使选择了使用sortBasedShuffle,它的排序也仅止于mapper端的排序,结果集不一定是有序的。
#########
以上是关于MR实现全排序的主要内容,如果未能解决你的问题,请参考以下文章