(十四)MapReducer运行流程
Posted 淡水留恋
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(十四)MapReducer运行流程相关的知识,希望对你有一定的参考价值。
//创建配置文件
Configuration conf = new Configuration();
//通过反射获取一个job对象
Job job = Job.getInstance(conf);
//指定本程序的jar包所在的本地路径
job.setJarByClass();
//指定mapper业务类和reducer业务类
job.setMapperClass();
job.setReducerClass();
//指定mapper输出数据的kv类型
job.setMapOutputKeyClass();
job.setMapOutputValueClass();
//指定最终输出的数据的kv类型
job.setOutputKeyClass();
job.setOutputValueClass();
FileInputFormat.setInputPaths(job, new Path("file:///wordcount/gpinput"));
FileOutputFormat.setOutputPath(job, new Path("file:///wordcount/gpoutput"));
自定义输出文件类,和输入文件类
//在此设置自定义的Groupingcomparator类 分组的类
job.setGroupingComparatorClass(ItemidGroupingComparator.class);
//利用reduce端的GroupingComparator来实现将一组bean看成相同的key reducer业务类开始之前会对数据进行分组,
默认相同的key会合并,所有的values用迭代器来合并。可以指定为自己写的方法来分组
public class ItemidGroupingComparator extends WritableComparator {
//传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象
protected ItemidGroupingComparator() {
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean abean = (OrderBean) a;
OrderBean bbean = (OrderBean) b;
//比较两个bean时,指定只比较bean中的orderid
return abean.getItemid().compareTo(bbean.getItemid());
}
}
//在此设置自定义的partitioner类 ,
1、mapper业务类在调用OutPutCollector像环形缓冲区写数据
2、环形缓冲区中的数据通过spiller将数据 分组(HashPartitionet)、排序(key.compareTo)、可能会有Combiner(作用:合并相同的key,)
3、输出的数据就是分组之后的有序数据
4、因此可以自定义方法进行分组和排序
job.setPartitionerClass(ItemIdPartitioner.class);
//指定分组
public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable>{
@Override
public int getPartition(OrderBean bean, NullWritable value, int numReduceTasks) {
//相同id的订单bean,会发往相同的partition
//而且,产生的分区数,是会跟用户设置的reduce task数保持一致
return (bean.getItemid().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
//指定排序 继承WritableComparable类,并重写compareTo方法
public class OrderBean implements WritableComparable<OrderBean>
@Override
public int compareTo(OrderBean o) {
int cmp = this.itemid.compareTo(o.getItemid());
if (cmp == 0) {
cmp = -this.amount.compareTo(o.getAmount());
}
return cmp;
}
//指定reduce task的数量
job.setNumReduceTasks(2);
//提交job类
job.waitForCompletion(true);
以上是关于(十四)MapReducer运行流程的主要内容,如果未能解决你的问题,请参考以下文章
Hadoop运行中NameNode闪退和运行mapreducer时卡在Running job.....