Hadoop3 - MapReduce 分区介绍及自定义分区

Posted 小毕超

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop3 - MapReduce 分区介绍及自定义分区相关的知识,希望对你有一定的参考价值。

一、MapReduce 分区

上篇文章使用 COVID-19 对 MapReduce 进一步的案例理解,本篇文章讲解 MapReduce 分区,下面是上篇文章的地址:

https://blog.csdn.net/qq_43692950/article/details/127475811

在默认情况下,不管map阶段有多少个并发执行task,到reduce阶段,所有的结果都将有一个reduce来处理,并且最终结果输出到一个文件中,此时,MapReduce的执行流程如下所示:


此时性能的瓶颈就在reduce阶段,那是不是可以多增加几个 reduce,肯定是可以的,在MapReduce程序的驱动类中,通过job提供的方法,可以修改reducetask的个数。

job.setNumReduceTasks(3);

默认情况下不设置,reducetask个数为1,会将结果输出到一个文件中。如果设置成3个,拿上篇文章的 COVID-19 案例进行执行:

执行后,查看输出目录:

可以看到有3个输出结果。

此时的 MapReduce 执行流程如下所示

二、分区规则及自定义分区

MapReduce中有多个reducetask执行的时候,此时maptask的输出到底给哪个 reducetask 来处理呢?其中有个分区规则,默认情况下采用 HashPartitioner 也就是根据 key % 分区数 确定到底是哪个分区:

那如果默认的无法满足我们,也是可以进行自定义分区规则的,只需要继承 Partitioner 类,然后在 getPartition 方法中返回具体的分区编号即可。

下面还是使用上篇文章的案例,如果我们对 Illinois、Arkansas、Texas、Florida、Indiana 五个州的数据统计放在一起,其他的州放在一起。

public class StatePartitioner extends Partitioner<Text, CountVO> 

    private static Map<String, Integer> stateMap = new ConcurrentHashMap<String, Integer>();

    static 
        stateMap.put("Illinois", 0);
        stateMap.put("Arkansas", 1);
        stateMap.put("Texas", 2);
        stateMap.put("Florida", 3);
        stateMap.put("Indiana", 4);
    

    @Override
    public int getPartition(Text key, CountVO value, int numPartitions) 
        return Objects.isNull(stateMap.get(key.toString())) ? 5 : stateMap.get(key.toString());
    

修改驱动类,设置 NumReduceTasks6PartitionerClass 为上面的分区类:

public class SumDriver extends Configured implements Tool 
    public static void main(String[] args) throws Exception
        //配置文件对象
        Configuration conf = new Configuration();
        int status = ToolRunner.run(conf, new SumDriver(), args);
        System.exit(status);
    

    @Override
    public int run(String[] args) throws Exception 
        // 配置本次作业的输入数据路径 和输出数据路径,通过参数传递进来,
        // 如果输入是一个目录,则会读取目录下的所有文件汇总到进行处理
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);

        // 输出目录必须为空,如果不为空则会报错提示
        FileSystem fs = FileSystem.get(getConf());
        if(fs.exists(output))
            fs.delete(output,true);
        
        // 创建作业实例
        Job job = Job.getInstance(getConf(), SumDriver.class.getSimpleName());
        // 设置作业驱动类
        job.setJarByClass(SumDriver.class);

        // 设置作业mapper reducer类
        job.setMapperClass(SumMapper.class);
        job.setReducerClass(SumReducer.class);

        // 设置作业mapper阶段输出key value数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CountVO.class);
        //设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CountVO.class);

        job.setNumReduceTasks(6);
        job.setPartitionerClass(StatePartitioner.class);

        // 配置作业的输入数据路径
        FileInputFormat.addInputPath(job, input);
        // 配置作业的输出数据路径
        FileOutputFormat.setOutputPath(job, output);


        return job.waitForCompletion(true)? 0:1;
    

执行后,查看输出目录,可以得到 6 个结果:


查看第一个结果,都是 Illinois 州的:

查看第二个:

查看第三个:

查看最后一个:

已经实现自定义分区的效果。

以上是关于Hadoop3 - MapReduce 分区介绍及自定义分区的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop3 - MapReduce 分组介绍及案例实践

Hadoop3 - MapReduce 分组介绍及案例实践

Hadoop3 - MapReduce 介绍于基本使用

Hadoop3 - MapReduce 介绍于基本使用

Hadoop3 - MapReduce 介绍于基本使用

Hadoop3 - MapReduce COVID-19 案例实践