Hadoop3 - MapReduce 分区介绍及自定义分区
Posted 小毕超
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop3 - MapReduce 分区介绍及自定义分区相关的知识,希望对你有一定的参考价值。
一、MapReduce 分区
上篇文章使用 COVID-19 对 MapReduce 进一步的案例理解,本篇文章讲解 MapReduce 分区,下面是上篇文章的地址:
在默认情况下,不管map阶段有多少个并发执行task
,到reduce
阶段,所有的结果都将有一个reduce
来处理,并且最终结果输出到一个文件中,此时,MapReduce
的执行流程如下所示:
此时性能的瓶颈就在reduce
阶段,那是不是可以多增加几个 reduce
,肯定是可以的,在MapReduce
程序的驱动类中,通过job
提供的方法,可以修改reducetask
的个数。
job.setNumReduceTasks(3);
默认情况下不设置,reducetask
个数为1,会将结果输出到一个文件中。如果设置成3
个,拿上篇文章的 COVID-19
案例进行执行:
执行后,查看输出目录:
可以看到有3
个输出结果。
此时的 MapReduce
执行流程如下所示
二、分区规则及自定义分区
当MapReduce
中有多个reducetask
执行的时候,此时maptask
的输出到底给哪个 reducetask
来处理呢?其中有个分区规则,默认情况下采用 HashPartitioner
也就是根据 key % 分区数
确定到底是哪个分区:
那如果默认的无法满足我们,也是可以进行自定义分区规则的,只需要继承 Partitioner
类,然后在 getPartition
方法中返回具体的分区编号即可。
下面还是使用上篇文章的案例,如果我们对 Illinois、Arkansas、Texas、Florida、Indiana
五个州的数据统计放在一起,其他的州放在一起。
public class StatePartitioner extends Partitioner<Text, CountVO>
private static Map<String, Integer> stateMap = new ConcurrentHashMap<String, Integer>();
static
stateMap.put("Illinois", 0);
stateMap.put("Arkansas", 1);
stateMap.put("Texas", 2);
stateMap.put("Florida", 3);
stateMap.put("Indiana", 4);
@Override
public int getPartition(Text key, CountVO value, int numPartitions)
return Objects.isNull(stateMap.get(key.toString())) ? 5 : stateMap.get(key.toString());
修改驱动类,设置 NumReduceTasks
为 6
,PartitionerClass
为上面的分区类:
public class SumDriver extends Configured implements Tool
public static void main(String[] args) throws Exception
//配置文件对象
Configuration conf = new Configuration();
int status = ToolRunner.run(conf, new SumDriver(), args);
System.exit(status);
@Override
public int run(String[] args) throws Exception
// 配置本次作业的输入数据路径 和输出数据路径,通过参数传递进来,
// 如果输入是一个目录,则会读取目录下的所有文件汇总到进行处理
Path input = new Path(args[0]);
Path output = new Path(args[1]);
// 输出目录必须为空,如果不为空则会报错提示
FileSystem fs = FileSystem.get(getConf());
if(fs.exists(output))
fs.delete(output,true);
// 创建作业实例
Job job = Job.getInstance(getConf(), SumDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(SumDriver.class);
// 设置作业mapper reducer类
job.setMapperClass(SumMapper.class);
job.setReducerClass(SumReducer.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CountVO.class);
//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CountVO.class);
job.setNumReduceTasks(6);
job.setPartitionerClass(StatePartitioner.class);
// 配置作业的输入数据路径
FileInputFormat.addInputPath(job, input);
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job, output);
return job.waitForCompletion(true)? 0:1;
执行后,查看输出目录,可以得到 6 个结果:
查看第一个结果,都是 Illinois
州的:
查看第二个:
查看第三个:
查看最后一个:
已经实现自定义分区的效果。
以上是关于Hadoop3 - MapReduce 分区介绍及自定义分区的主要内容,如果未能解决你的问题,请参考以下文章