Hadoop3 - MapReduce 分组介绍及案例实践
Posted 小毕超
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop3 - MapReduce 分组介绍及案例实践相关的知识,希望对你有一定的参考价值。
一、MapReduce 分组
上篇文章对 MapReduce
分区进行了介绍,通过分区规则控制不同的数据进到不同的 reducetask
中,而本篇文章讲的分组则是进到同一个 reducetask
中的数据的归类分组规则,下面是上篇文章的地址:
分组在发生在reduce
阶段,决定了同一个reduce
中哪些数据将组成一组调用一次reduce
方法处理。默认分组规则是:key
相同的就会分为一组(前后两个key直接比较是否相等)。在reduce
阶段进行分组之前,会首先进行数据排序行为(key 的 compareTo 方法
)。
如果需要自定义规则,只需继承 WritableComparator
,重写Compare
方法,如果返回结果是 0
则认为前后两个相等分为一组,该类需要在job
对象中进行设置:
job.setGroupingComparatorClass(xxxx.class);
注意点:
在分组后有可能出现对reduce
参数values
遍历时发现 key
的值也会变化,比如: key
值是个对象,其中有 a,b
两个属性,其中自定义分组中根据 a
相同则认为一组,可以发现在遍历 values
的时候 key
的 b
是变化的(前提是Map
阶段的 a
相同 而 b
不相同)。
下面利用上面的特征对前面文章讲解的 COVID-19
案例进行进一步分析,下面是该文章地址:
在这篇文章中我们有对每个州的 deaths
筛选出Top3
的县,当时使用的 Java
的 List
排序进行筛选的,这种情况在数据量巨大的情况下很容易产生 OOM
,因此可以基于自定义排序加分组的方式计算出 Top3
的县 。
二、计算每个州的 deaths 筛选出Top3的县
修改 CountVO
类,修改排序规则,根据 state 正序排列,相同的然后再根据 deaths 倒序排序:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountVO implements WritableComparable<CountVO>
private String state;//州
private String county;//县
private Long deaths;//死亡病例
public void set(String state, String county, Long deaths)
this.state = state;
this.county = county;
this.deaths = deaths;
@Override
public void write(DataOutput out) throws IOException
out.writeUTF(state);
out.writeUTF(county);
out.writeLong(deaths);
@Override
public void readFields(DataInput in) throws IOException
this.state = in.readUTF();
this.county = in.readUTF();
this.deaths = in.readLong();
//排序规则,根据首先根据 state 正序排列,然后根据 deaths 倒序排序
@Override
public int compareTo(CountVO o)
int i = this.state.compareTo(o.getState());
if (i == 0)
return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);
return i;
@Override
public String toString()
return "CountVO" +
"state='" + state + '\\'' +
", county='" + county + '\\'' +
", deaths=" + deaths +
'';
编写自定义分组,继承 WritableComparator
,并使用 state
分组。
public class Top3GroupingComparator extends WritableComparator
protected Top3GroupingComparator()
super(CountVO.class,true);
@Override
public int compare(WritableComparable a, WritableComparable b)
CountVO aBean = (CountVO) a;
CountVO bBean = (CountVO) b;
return aBean.getState().compareTo(bBean.getState());
编写 Mapper
类,由于对 key
进行了排序和分组,后面 Reducer
阶段直接对 key
操作即可,这里 value
给个 NullWritable
站位吧:
public class Top3Mapper extends Mapper<LongWritable, Text, CountVO, NullWritable>
CountVO outKey = new CountVO();
NullWritable outValue = NullWritable.get();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
String[] fields = value.toString().split(",");
//封装数据: 州、县、死亡病例
outKey.set(fields[2], fields[1], Long.parseLong(fields[fields.length - 1]));
context.write(outKey, outValue);
编写 Reducer
由于这边对 state
进行了分组,所以 reduce
中的都是同一个 state
的,但是在 Mapper
阶段给的 county
和 deaths
都是不同的,因此可以通过便利 values
切换 key
的内容,而 key 则根据 deaths
降序排列了,因此前 3
个就是 Top3
:
public class Top3Reducer extends Reducer<CountVO, NullWritable, CountVO, NullWritable>
@Override
protected void reduce(CountVO key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException
int num = 1;
for (NullWritable value : values)
if (num > 3)
break;
context.write(key, value);
num++;
最后编写驱动类,指定 Mapper
和 Reducer
以及 自定义分组类:
public class Top3Driver extends Configured implements Tool
public static void main(String[] args) throws Exception
//配置文件对象
Configuration conf = new Configuration();
int status = ToolRunner.run(conf, new Top3Driver(), args);
System.exit(status);
@Override
public int run(String[] args) throws Exception
// 配置本次作业的输入数据路径 和输出数据路径,通过参数传递进来,
// 如果输入是一个目录,则会读取目录下的所有文件汇总到进行处理
Path input = new Path(args[0]);
Path output = new Path(args[1]);
// 输出目录必须为空,如果不为空则会报错提示
FileSystem fs = FileSystem.get(getConf());
if(fs.exists(output))
fs.delete(output,true);
// 创建作业实例
Job job = Job.getInstance(getConf(), Top3Driver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(Top3Driver.class);
// 设置作业mapper reducer类
job.setMapperClass(Top3Mapper.class);
job.setReducerClass(Top3Reducer.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(CountVO.class);
job.setMapOutputValueClass(NullWritable.class);
//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
job.setOutputKeyClass(CountVO.class);
job.setOutputValueClass(NullWritable.class);
//指定分组规则
job.setGroupingComparatorClass(Top3GroupingComparator.class);
// 配置作业的输入数据路径
FileInputFormat.addInputPath(job, input);
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job, output);
return job.waitForCompletion(true)? 0:1;
运行驱动类,指定数据目录以及输出目录:
执行成功后,到输出目录中查看结果。
以上是关于Hadoop3 - MapReduce 分组介绍及案例实践的主要内容,如果未能解决你的问题,请参考以下文章
Hadoop3 - MapReduce 分区介绍及自定义分区