Hadoop3 - MapReduce 分组介绍及案例实践

Posted 小毕超

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop3 - MapReduce 分组介绍及案例实践相关的知识,希望对你有一定的参考价值。

一、MapReduce 分组

上篇文章对 MapReduce 分区进行了介绍,通过分区规则控制不同的数据进到不同的 reducetask 中,而本篇文章讲的分组则是进到同一个 reducetask 中的数据的归类分组规则,下面是上篇文章的地址:

https://blog.csdn.net/qq_43692950/article/details/127477363

分组在发生在reduce阶段,决定了同一个reduce中哪些数据将组成一组调用一次reduce方法处理。默认分组规则是:key相同的就会分为一组(前后两个key直接比较是否相等)。在reduce阶段进行分组之前,会首先进行数据排序行为(key 的 compareTo 方法)。

如果需要自定义规则,只需继承 WritableComparator,重写Compare方法,如果返回结果是 0 则认为前后两个相等分为一组,该类需要在job 对象中进行设置:

job.setGroupingComparatorClass(xxxx.class);

注意点:
在分组后有可能出现对reduce参数values遍历时发现 key 的值也会变化,比如: key 值是个对象,其中有 a,b 两个属性,其中自定义分组中根据 a 相同则认为一组,可以发现在遍历 values 的时候 keyb 是变化的(前提是Map 阶段的 a 相同 而 b 不相同)。

下面利用上面的特征对前面文章讲解的 COVID-19 案例进行进一步分析,下面是该文章地址:

https://blog.csdn.net/qq_43692950/article/details/127475811

在这篇文章中我们有对每个州的 deaths 筛选出Top3的县,当时使用的 JavaList 排序进行筛选的,这种情况在数据量巨大的情况下很容易产生 OOM,因此可以基于自定义排序加分组的方式计算出 Top3的县 。

二、计算每个州的 deaths 筛选出Top3的县

修改 CountVO 类,修改排序规则,根据 state 正序排列,相同的然后再根据 deaths 倒序排序:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountVO implements WritableComparable<CountVO> 

    private String state;//州
    private String county;//县
    private Long deaths;//死亡病例

    public void set(String state, String county, Long deaths) 
        this.state = state;
        this.county = county;
        this.deaths = deaths;
    

    @Override
    public void write(DataOutput out) throws IOException 
        out.writeUTF(state);
        out.writeUTF(county);
        out.writeLong(deaths);
    

    @Override
    public void readFields(DataInput in) throws IOException 
        this.state = in.readUTF();
        this.county = in.readUTF();
        this.deaths = in.readLong();
    

    //排序规则,根据首先根据 state 正序排列,然后根据 deaths 倒序排序
    @Override
    public int compareTo(CountVO o) 
        int i = this.state.compareTo(o.getState());
        if (i == 0)
            return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);
        
        return i;
    

    @Override
    public String toString() 
        return "CountVO" +
                "state='" + state + '\\'' +
                ", county='" + county + '\\'' +
                ", deaths=" + deaths +
                '';
    


编写自定义分组,继承 WritableComparator ,并使用 state 分组。

public class Top3GroupingComparator extends WritableComparator 

    protected Top3GroupingComparator()
        super(CountVO.class,true);
    

    @Override
    public int compare(WritableComparable a, WritableComparable b) 
        CountVO aBean = (CountVO) a;
        CountVO bBean = (CountVO) b;
        return aBean.getState().compareTo(bBean.getState());
    

编写 Mapper 类,由于对 key 进行了排序和分组,后面 Reducer 阶段直接对 key 操作即可,这里 value 给个 NullWritable 站位吧:

public class Top3Mapper extends Mapper<LongWritable, Text, CountVO, NullWritable> 

    CountVO outKey = new CountVO();
    NullWritable outValue = NullWritable.get();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        String[] fields = value.toString().split(",");
        //封装数据: 州、县、死亡病例
        outKey.set(fields[2], fields[1], Long.parseLong(fields[fields.length - 1]));
        context.write(outKey, outValue);
    

编写 Reducer 由于这边对 state 进行了分组,所以 reduce 中的都是同一个 state 的,但是在 Mapper 阶段给的 countydeaths 都是不同的,因此可以通过便利 values 切换 key 的内容,而 key 则根据 deaths 降序排列了,因此前 3 个就是 Top3

public class Top3Reducer extends Reducer<CountVO, NullWritable, CountVO, NullWritable> 
    @Override
    protected void reduce(CountVO key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException 
        int num = 1;
        for (NullWritable value : values) 
            if (num > 3) 
                break;
            
            context.write(key, value);
            num++;
        
    

最后编写驱动类,指定 MapperReducer 以及 自定义分组类:

public class Top3Driver extends Configured implements Tool 

    public static void main(String[] args) throws Exception
        //配置文件对象
        Configuration conf = new Configuration();
        int status = ToolRunner.run(conf, new Top3Driver(), args);
        System.exit(status);
    

    @Override
    public int run(String[] args) throws Exception 
        // 配置本次作业的输入数据路径 和输出数据路径,通过参数传递进来,
        // 如果输入是一个目录,则会读取目录下的所有文件汇总到进行处理
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);

        // 输出目录必须为空,如果不为空则会报错提示
        FileSystem fs = FileSystem.get(getConf());
        if(fs.exists(output))
            fs.delete(output,true);
        
        // 创建作业实例
        Job job = Job.getInstance(getConf(), Top3Driver.class.getSimpleName());
        // 设置作业驱动类
        job.setJarByClass(Top3Driver.class);

        // 设置作业mapper reducer类
        job.setMapperClass(Top3Mapper.class);
        job.setReducerClass(Top3Reducer.class);

        // 设置作业mapper阶段输出key value数据类型
        job.setMapOutputKeyClass(CountVO.class);
        job.setMapOutputValueClass(NullWritable.class);
        //设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
        job.setOutputKeyClass(CountVO.class);
        job.setOutputValueClass(NullWritable.class);

        //指定分组规则
        job.setGroupingComparatorClass(Top3GroupingComparator.class);

        // 配置作业的输入数据路径
        FileInputFormat.addInputPath(job, input);
        // 配置作业的输出数据路径
        FileOutputFormat.setOutputPath(job, output);

        return job.waitForCompletion(true)? 0:1;
    


运行驱动类,指定数据目录以及输出目录:

执行成功后,到输出目录中查看结果。

以上是关于Hadoop3 - MapReduce 分组介绍及案例实践的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop3 - MapReduce 分区介绍及自定义分区

Hadoop3 - MapReduce 分区介绍及自定义分区

Hadoop3 - MapReduce 介绍于基本使用

Hadoop3 - MapReduce 介绍于基本使用

Hadoop3 - MapReduce 介绍于基本使用

Hadoop3 - MapReduce COVID-19 案例实践