Hadoop使用采样器实现全排序
Posted 健哥说编程
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop使用采样器实现全排序相关的知识,希望对你有一定的参考价值。
排序
排序是MapRedue的核心技术。
1、部分排序:每一个Reducer输出一个part-r-xxxx的文件,这里面的数据都是已经根据key进行排序的。但,不能保证part-r-00001与part-r-00002是两个文件中的内容是顺序排序的。如:part-r-00000文件内容如下:
1
3
5
而part-r-00001文件内容如下:
0
2
5
7
上面这种输出就是部分排序,就是每一个文件中的内容,独立排序。
部分排序图示:
2、全排序:如何使用Hadoop产生一个全局排序的文件?最简单的方法就是使用一个分区(a single partition,[即只需要一个Reducer])。但此方法在处理大型文件时效率极低,因为一台机器必须要处理所有输出的文件,从而完全丧失了MapReduce所提供的并行框架的优势 。
全排序序图示:
重要提示:
1:首先,必须要先有创建的一系列已经排序好的文件。
2:串联这些文件,主要是通过Partitioner来描述输出的全局排序。(使用TotalOrderPartitioner类)。
3、通过对键空间进行采样,就可以较为均匀的划分数据集。采样的核心思想是只查看一小部分键,以获得键的近似分布,并由此构建键分区。
4、hadoop已经内置了若干采样器。这些类都在:
org.apache.hadoop.mapreduce.lib.partition.InputSampler的内部类中。如:
1、在使用hadoop jar运行一个job时,我们可以使用-D mapreduce.job.reduces=n来指定reduce的个数,然后查看每一个part-r-0000n文件中输出的数据。默认情况下,应该都是部分排序。
示例-全排序
步1:准备数据
1、现在我们将Text文件转换成<LongWritable,Text>类型的顺序文件保存。
2、顺序文件SequenceFile以LongWritable做为key保存,便于排序。(经过本人几次测试,全排序最好的输入类型就是SequenceFile顺序文件类型)。
package cn.wangjian.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* 用于统计每一行字符的个数<br>
* 使用以下命令:<br>
* hadoop jar mr.jar cn.wangjian.mapreduce.Demo23_01_LetterCountToSequenceFile \<br>
* -D mapreduce.job.reduces=3 /test /out001<br>
* <b>注意,使用-D mapreduce.job.reduces=3指定Reduce的个数</b><br>
* <b>这样会输出一个部分排序的三个reduce文件</b>
* @author wangjian
* @version 1.0 2018/5/26 0026
*/
public class Demo23_01_LetterCountToSequenceFile extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("usage : input output");
return -1;
}
Configuration config = getConf();
FileSystem fs = FileSystem.get(config);
Path dest = new Path(args[1]);
if (fs.exists(dest)) {
fs.delete(dest, true);
}
Job job = Job.getInstance(config, "LetterCounter");
job.setJarByClass(getClass());
//添加Mapper
job.setMapperClass(MyMapper.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
//输出顺序文件,顺序文件的key就是LongWritable类型,便于排序
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, dest);
job.setOutputKeyClass(LongWritable.class);//顺序文件输出的类型k,v类型
job.setOutputValueClass(Text.class);
int code = job.waitForCompletion(true)?0:1;
return code;
}
public static void main(String[] args) throws Exception {
int code = ToolRunner.run(new Demo23_01_LetterCountToSequenceFile(), args);
System.exit(code);
}
public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
private LongWritable key = new LongWritable();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
this.key.set(value.toString().length());//获取某一个行字符的个数
context.write(this.key, value);
}
}
}
执行以下命令,重点是指定Reduce的个数,以便于生成多个部分排序的文件:
$ hadoop jar mr.jar cn.wangjian.mapreduce.Demo23_01_LetterCountToSequenceFile \
-D mapreduce.job.reduces=3 /test /out001
查看生成的文件:
$ hdfs dfs -ls /out001
Found 4 items
-rw-r--r-- 1 wangjian supergroup 0 2018-05-26 21:55 /out001/_SUCCESS
-rw-r--r-- 1 wangjian supergroup 8416 2018-05-26 21:55 /out001/part-r-00000
-rw-r--r-- 1 wangjian supergroup 9403 2018-05-26 21:55 /out001/part-r-00001
-rw-r--r-- 1 wangjian supergroup 7114 2018-05-26 21:55 /out001/part-r-00002
现在可以通过查看每一个文件,获取里面的内容。每一个文件,都是排序的文件,但文件之间确没有排序的特点。这就是部分排序。使用-text查看顺序文件中的内容:
$ hdfs dfs -text /out001/part-r-00000
步2:全排序
1、指定Partitioner即分区使用TotalOrderPartitioner。
2、TotalOrderPartitioner需要使用采样器,以便于进行数据排序。
package cn.wangjian.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
/**
* 用于将已经部分排序的文件实现全排序<br>
* hadoop jar mr.jar Demo23_02_LetterCountTotalOrder -D mapreduce.job.reduces=3 /in /out
* @author wangjian
* @version 1.0 2018/5/26 0026
*/
public class Demo23_02_LetterCountTotalOrder extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("usage : input output");
return -1;
}
Configuration config = getConf();
FileSystem fs = FileSystem.get(config);
Path dest = new Path(args[1]);
if (fs.exists(dest)) {
fs.delete(dest, true);
}
//声明Job
Job job = Job.getInstance(config, "LetterCounterTotal");
job.setJarByClass(getClass());
//设置读取顺序文件
job.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
//以文本形式输出数据
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, dest);
job.setOutputKeyClass(LongWritable.class);//顺序文件输出的类型k,v类型
job.setOutputValueClass(Text.class);
//使用TotalOrderPartitioner处理分区
job.setPartitionerClass(TotalOrderPartitioner.class);
//声明采样器,0.1:采样率。100,10为最大样本和最大分区。只要满足任何一个条件即停止采样
InputSampler.Sampler<IntWritable,Text> sampler =
new InputSampler.RandomSampler<IntWritable, Text>(0.1D,100,10);
InputSampler.writePartitionFile(job,sampler);
//可选的设置缓存
String partitionFile = TotalOrderPartitioner.getPartitionFile(config);
job.addCacheFile(new URI(partitionFile));
int code = job.waitForCompletion(true)?0:1;
return code;
}
public static void main(String[] args) throws Exception {
int code = ToolRunner.run(new Demo23_02_LetterCountTotalOrder(), args);
System.exit(code);
}
}
运行并查看结果:
$ hadoop jar mr.jar cn.wangjian.mapreduce.Demo23_02_LetterCountTotalOrder \
-D mapreduce.job.reduces=3 /out001 /out002
以上是关于Hadoop使用采样器实现全排序的主要内容,如果未能解决你的问题,请参考以下文章
Hadoop---mapreduce排序和二次排序以及全排序
大数据之Hadoop(MapReduce):WritableComparable排序案例实操(全排序)
Hadoop中的MapReduce框架原理WritableComparable排序排序分类WritableComparable排序案例实操(全排序)(二次排序)