Hadoop使用采样器实现全排序

Posted 健哥说编程

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop使用采样器实现全排序相关的知识,希望对你有一定的参考价值。

排序

排序是MapRedue的核心技术。

1、部分排序:每一个Reducer输出一个part-r-xxxx的文件,这里面的数据都是已经根据key进行排序的。但,不能保证part-r-00001part-r-00002是两个文件中的内容是顺序排序的。如:part-r-00000文件内容如下:

1

3

5

part-r-00001文件内容如下:

0

2

5

7

上面这种输出就是部分排序,就是每一个文件中的内容,独立排序。

部分排序图示:

 


 

2、全排序:如何使用Hadoop产生一个全局排序的文件?最简单的方法就是使用一个分区(a single partition[即只需要一个Reducer])。但此方法在处理大型文件时效率极低,因为一台机器必须要处理所有输出的文件,从而完全丧失了MapReduce所提供的并行框架的优势 。

全排序序图示:

 


重要提示:

1:首先,必须要先有创建的一系列已经排序好的文件。

2:串联这些文件,主要是通过Partitioner来描述输出的全局排序。(使用TotalOrderPartitioner类)

 

3、通过对键空间进行采样,就可以较为均匀的划分数据集。采样的核心思想是只查看一小部分键,以获得键的近似分布,并由此构建键分区。

4hadoop已经内置了若干采样器。这些类都在:

org.apache.hadoop.mapreduce.lib.partition.InputSampler的内部类中。如:

 


 

1、在使用hadoop jar运行一个job时,我们可以使用-D mapreduce.job.reduces=n来指定reduce的个数,然后查看每一个part-r-0000n文件中输出的数据。默认情况下,应该都是部分排序。

 

示例-全排序

1:准备数据

1、现在我们将Text文件转换成<LongWritable,Text>类型的顺序文件保存。

2、顺序文件SequenceFileLongWritable做为key保存,便于排序。(经过本人几次测试,全排序最好的输入类型就是SequenceFile顺序文件类型)。

package cn.wangjian.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
 * 用于统计每一行字符的个数<br>
 * 使用以下命令:<br>
 *  hadoop jar mr.jar cn.wangjian.mapreduce.Demo23_01_LetterCountToSequenceFile \<br>
 *  -D mapreduce.job.reduces=3 /test /out001<br>
 *  <b>注意,使用-D mapreduce.job.reduces=3指定Reduce的个数</b><br>
 *  <b>这样会输出一个部分排序的三个reduce文件</b>
 * @author wangjian
 * @version 1.0 2018/5/26 0026
 */
public class Demo23_01_LetterCountToSequenceFile extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.out.println("usage : input output");
            return -1;
        }
        Configuration config = getConf();
        FileSystem fs = FileSystem.get(config);
        Path dest = new Path(args[1]);
        if (fs.exists(dest)) {
            fs.delete(dest, true);
        }
        Job job = Job.getInstance(config, "LetterCounter");
        job.setJarByClass(getClass());
        //添加Mapper
        job.setMapperClass(MyMapper.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        //输出顺序文件,顺序文件的key就是LongWritable类型,便于排序
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        FileOutputFormat.setOutputPath(job, dest);
        job.setOutputKeyClass(LongWritable.class);//顺序文件输出的类型k,v类型
        job.setOutputValueClass(Text.class);
        int code = job.waitForCompletion(true)?0:1;
        return code;
    }
    public static void main(String[] args) throws Exception {
        int code = ToolRunner.run(new Demo23_01_LetterCountToSequenceFile(), args);
        System.exit(code);
    }
    public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
        private LongWritable key = new LongWritable();
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            this.key.set(value.toString().length());//获取某一个行字符的个数
            context.write(this.key, value);
        }
    }
}

执行以下命令,重点是指定Reduce的个数,以便于生成多个部分排序的文件:

$ hadoop jar mr.jar cn.wangjian.mapreduce.Demo23_01_LetterCountToSequenceFile \

-D mapreduce.job.reduces=3 /test /out001

查看生成的文件:

$ hdfs dfs -ls /out001

Found 4 items

-rw-r--r--   1 wangjian supergroup          0 2018-05-26 21:55 /out001/_SUCCESS

-rw-r--r--   1 wangjian supergroup       8416 2018-05-26 21:55 /out001/part-r-00000

-rw-r--r--   1 wangjian supergroup       9403 2018-05-26 21:55 /out001/part-r-00001

-rw-r--r--   1 wangjian supergroup       7114 2018-05-26 21:55 /out001/part-r-00002

现在可以通过查看每一个文件,获取里面的内容。每一个文件,都是排序的文件,但文件之间确没有排序的特点。这就是部分排序。使用-text查看顺序文件中的内容:

$ hdfs dfs -text /out001/part-r-00000

2:全排序

1、指定Partitioner即分区使用TotalOrderPartitioner

2、TotalOrderPartitioner需要使用采样器,以便于进行数据排序。

package cn.wangjian.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

/**
 * 用于将已经部分排序的文件实现全排序<br>
 * hadoop jar mr.jar Demo23_02_LetterCountTotalOrder -D mapreduce.job.reduces=3 /in /out
 * @author wangjian
 * @version 1.0 2018/5/26 0026
 */
public class Demo23_02_LetterCountTotalOrder extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.out.println("usage : input output");
            return -1;
        }
        Configuration config = getConf();
        FileSystem fs = FileSystem.get(config);
        Path dest = new Path(args[1]);
        if (fs.exists(dest)) {
            fs.delete(dest, true);
        }
        //声明Job
        Job job = Job.getInstance(config, "LetterCounterTotal");
        job.setJarByClass(getClass());
        //设置读取顺序文件
        job.setInputFormatClass(SequenceFileInputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        //以文本形式输出数据
        job.setOutputFormatClass(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job, dest);
        job.setOutputKeyClass(LongWritable.class);//顺序文件输出的类型k,v类型
        job.setOutputValueClass(Text.class);
        //使用TotalOrderPartitioner处理分区
        job.setPartitionerClass(TotalOrderPartitioner.class);
        //声明采样器,0.1:采样率。100,10为最大样本和最大分区。只要满足任何一个条件即停止采样
        InputSampler.Sampler<IntWritable,Text> sampler =
                new InputSampler.RandomSampler<IntWritable, Text>(0.1D,100,10);
        InputSampler.writePartitionFile(job,sampler);
        //可选的设置缓存
        String partitionFile = TotalOrderPartitioner.getPartitionFile(config);
        job.addCacheFile(new URI(partitionFile));
        int code = job.waitForCompletion(true)?0:1;
        return code;
    }

    public static void main(String[] args) throws Exception {
        int code = ToolRunner.run(new Demo23_02_LetterCountTotalOrder(), args);
        System.exit(code);
    }
}

 

运行并查看结果:

$ hadoop jar mr.jar cn.wangjian.mapreduce.Demo23_02_LetterCountTotalOrder \

-D mapreduce.job.reduces=3 /out001 /out002


以上是关于Hadoop使用采样器实现全排序的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop---mapreduce排序和二次排序以及全排序

9.2.2 hadoop全排序实例详解

大数据之Hadoop(MapReduce):WritableComparable排序案例实操(全排序)

1hive实现全排序

Hadoop中的MapReduce框架原理WritableComparable排序排序分类WritableComparable排序案例实操(全排序)(二次排序)

WritableComparable(排序)