HadoopMR的分区排序分组

Posted 来自遥远的水星

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HadoopMR的分区排序分组相关的知识,希望对你有一定的参考价值。

一.分区

问题:按照条件将结果输出到不同文件中

自定义分区步骤

1.自定义继承Partitioner类,重写getPartition()方法

2.在job驱动Driver中设置自定义的Partitioner

3.在Driver中根据分区数设置reducetask数

分区数和reducetask关系

案例实操

将统计结果按照手机归属地不同省份输出到不同文件中(分区),手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中

(1)自定义分区类

MyPartitioner.class

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
        String phone = text.toString();
        if (phone.startsWith("136")) {
            return 0;
        } else if (phone.startsWith("137")) {
            return 1;
        } else if (phone.startsWith("138")) {
            return 2;
        }else if (phone.startsWith("139")){
            return 3;
        }else {
            return 4;
        }
    }
}
(2)在Driver类设置分区和reducetask数
//设置自定义partitioner
job.setPartitionerClass(MyPartioner.class);
//设置reducetask数量
job.setNumReduceTasks(5);

二.全排序、分区排序、分组

当自定义的对象作为key,按照指定条件进行排序

实现排序的2种方式

1.对象实现WritableComparable接口

实现WritableComparable接口,重写compareTo方法,就可以实现排序(二次排序)

public class OrderBean implements WritableComparable<OrderBean> { 
   
    //自定义排序,先按pid升序,再按pname降序
    @Override
    public int compareTo(OrderBean o) {
        int compare = this.pid.compareTo(o.pid);
        if (compare == 0) {
            return -this.pname.compareTo(o.pname);
        }
        return compare;
    }
}
2.继承WritableComparator类

自定义比较器继承WritableComparator类,父类构造方法增加需要比较的Bean对象,

//继承WritableComparator类
public class MyGroupCompartor extends WritableComparator {

    public MyGroupCompartor(){
		//增加Bean对象		 	
        super(OrderBean.class,true);
    } 
	// 对Bean的排序方法
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean oa = (OrderBean) a;
        OrderBean ob = (OrderBean) b;
        return oa.getPid().compareTo(ob.getPid());
    }
}

全排序

不分区,只有一个reducetask,针对Key进行排序

分区排序

针对key全排序,然后针对key进行分区

辅助排序【自定义分组】

分析:已经对key进行排序,比如key对象为OrderBean的排序是id,pname的二次排序

,在进入reduce()的分组希望是id相同的进入一组,那么就需要自定义分组针对id进行分组

OrderBean
id		pname  amount
1		小米		
1				 2400
1            1500
2		华为
2				2400
2           3400
自定义分组比较器

MyGroupCompartor.class

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MyGroupCompartor extends WritableComparator {

    public MyGroupCompartor(){
        super(OrderBean.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean oa = (OrderBean) a;
        OrderBean ob = (OrderBean) b;
        return oa.getPid().compareTo(ob.getPid());
    }
}
在Driver类中声明自定义分组
job.setGroupingComparatorClass(MyGroupCompartor.class);

以上是关于HadoopMR的分区排序分组的主要内容,如果未能解决你的问题,请参考以下文章

分区函数Partition By的与row_number()的用法以及与排序rank()的用法详解(获取分组(分区)中前几条记录)(转)

mapreduce 的二次排序

hive分组排序函数 分组取top10

3.算子+PV&UV+submit提交参数+资源调度和任务调度源码分析+二次排序+分组topN+SparkShell

MySQL分组、排序

spark 例子wordcount topk