hadoop MapReduce之辅助排序
Posted 健哥说编程
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop MapReduce之辅助排序相关的知识,希望对你有一定的参考价值。
辅助排序
也可以叫做二次排序或SecondarySort。
前言:
1:MapReduce框架在记录到达Reducer之前已经按键记录排序,但键所对应的值并没有排序。这些值来自不同的map任务。
2:一般来说,大多数MapReduce程序会避免让Reduce函数依赖于值的排序。
3:但有时,也需要按特性的方法对键进行排序和分组等以实现对值的排序。
示例:
假设存在以下气温记录
Datetemperature
2001/01/1123
2001/01/1215
2001/01/1318
2001/01/1430
2002/01/1134
2002/01/1212
2002/01/1318
...
现在需要查询出每年最高气温的那一天,即最终显示的结果为:
2001/01/1430
2002/01/1134
...
步1:排序输出
由于是按年进行统计,所以,我们可以使用年(yyyy)做为key进行排序处理。我们只需要按气温value倒序显示,并在Reducer中只输出key,不处理Iterable<..>的记录即可以获取最高的记录。
为了可以保存日期和气温的所有信息,我们可以声明一个JavaBean用于保存上面解析出来的气温信息。以下是JavaBean的源代码,它实现了WritableComparable接口类:
类名:Temperature。源代码如下,以下代码省去了getters和setters方法:
package cn.wangjian.mapreduce;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 开发封装每一天气温的JavaBean
* @author wangjian
* @version 1.0 2018/5/30 0030
*/
public class Temperature implements WritableComparable<Temperature> {
private String year;//可以使用int类型,也可以使用String类型
private String date;
private int temperature;
/**
* 在比较的代码中,我们先按年进行正序的比较,再按气温进行倒序的比较,这样气温最高的就会在第一行
* 如果这儿只按某一个值进行了比较,还可以通过setSortComparatorClass修改这儿的排序规则
*/
@Override
public int compareTo(Temperature o) {
int comp = this.year.compareTo(o.year);
if (comp == 0) {
comp = -(this.temperature - o.temperature);//注意前面取了反值,即倒序
}
return comp;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(year);
out.writeUTF(date);
out.writeInt(temperature);
}
@Override
public void readFields(DataInput in) throws IOException {
year = in.readUTF();
date = in.readUTF();
temperature = in.readInt();
}
@Override
public String toString() { //通过重写toString输出一个格式化的数据
return year+"\t"+date+"\t"+temperature;
}
}
根据上面的代码,我们已经先根据年进行了排序,又根据气温降序进行了排序。现在我们先开发好Mapper的Reducer并开发一个驱动,查看输出的结果:
开发Mapper,类名:TerperatureMapper:
通过mapper类的源代码可知,只是对输入的字符串,进行分割并封装到Temperature类中,并没有什么特别的代码。
package cn.wangjian.mapreduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 处理接收到的数据并进行每一行的分析,然后直接输出分析并封装好的对象
* @author wangjian
* @version 1.0 2018/5/30 0030
*/
public class TemperatureMapper extends Mapper<LongWritable,Text,Temperature,NullWritable> {
private Temperature key2 = new Temperature();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] strs = value.toString().split("\\s+");
String year = strs[0].substring(0,4);//取出年
int airTemp = Integer.parseInt(strs[1]);
key2.setYear(year);
key2.setTemperature(airTemp);
key2.setDate(strs[0]);
context.write(key2,NullWritable.get());
}
}
开发Reducer,其源代码如下:
通过查看Reducer的源代码,可知,并没有处理value部分的数据,只是将key中的数据直接输出。
package cn.wangjian.mapreduce;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author wangjian
* @version 1.0 2018/5/30 0030
*/
public class TemperatureReducer extends Reducer<Temperature,NullWritable,Temperature,NullWritable> {
@Override
public void reduce(Temperature key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//接收到数据以后直接输出
context.write(key,NullWritable.get());
}
}
现在开发驱动,先暂时输出查看输出的数据格式:
package cn.wangjian.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 辅助排序
* @author wangjian
* @version 1.0 2018/5/30 0030
*/
public class TemperatureMR extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if(args.length!=2){
System.out.println("usage : input output");
return -1;
}
Configuration config = getConf();
FileSystem fs = FileSystem.get(config);
Path dest = new Path(args[1]);
if(fs.exists(dest)){
fs.delete(dest,true);
}
Job job = Job.getInstance(config,"辅助排序");
job.setJarByClass(getClass());
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,dest);
job.setMapperClass(TemperatureMapper.class);
job.setMapOutputKeyClass(Temperature.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(TemperatureReducer.class);
job.setOutputKeyClass(Temperature.class);
job.setOutputValueClass(NullWritable.class);
int code = job.waitForCompletion(true)?0:1;
return code;
}
public static void main(String[] args) throws Exception {
int code = ToolRunner.run(new TemperatureMR(),args);
System.exit(code);
}
}
执行上面的MR驱动程序,输出以下结果:
通过结果,可以看出,已经按年进行了排序,且也已经根据气温的降序进行了排序。
20012001/01/1430
20012001/01/1123
20012001/01/1318
20012001/01/1215
20022002/01/1134
20022002/01/1318
20022002/01/1212
步2:开发分区类
现在我们需要开发一个Partitioner,根据年,以便于将相同的年放到同一个分区中处理。这样每一个分区中的第一行,就是本年气温最高的记录。然后在自己开发的Reducer的reduce方法中,只需要输出key就可以了。
现在发开发分区,源代码如下:
package cn.wangjian.mapreduce;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author wangjian
* @version 1.0 2018/5/30 0030
*/
public class TemperaturePartitioner extends Partitioner<Temperature, NullWritable> {
@Override
public int getPartition(Temperature temperature, NullWritable nullWritable, int numPartitions) {
return temperature.getYear().hashCode() % numPartitions;
}
}
步3:开发分组规则
在到达Reducer的reduce方法之前,会使用以下方法进行数据的分组合并,现在修改这个分组的策略:
package cn.wangjian.mapreduce;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* 指定用于分组的类
* @author wangjian
* @version 1.0 2018/5/30 0030
*/
public class TemperatureGrouping extends WritableComparator {
public TemperatureGrouping(){
super(Temperature.class,true);
}
/**
* 注意重写的方法接收的参数
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
//根据名称来进行分组所以比较名称
Temperature t1 = (Temperature) a;
Temperature t2 = (Temperature) b;
return t1.getYear().compareTo(t2.getYear());//比较年
}
}
步4:注册这两个类
job.setPartitionerClass(TemperaturePartitioner.class);
job.setGroupingComparatorClass(TemperatureGrouping.class);
步5:然后启动MR测试
输出以下结果,即为每一年,气温最高的那一天的记录:
20012001/01/1430
20022002/01/1134
小结:
1:使用自定义WritableComparable的JavaBean并在compareTo方法中实现自己比较的规则。
2:如果然后修改规则,可以再另开发一个WritableComparator的子类。并通过job.setSortComparatorClass修改javaBean中的比较的规则 。
3:开发类实现WritableComparator用于指定分组的规则。然后通过job.setGroupingComparatorClass进行注册 。
4:其他:setOutputKeyComparatorClass,0.20以后为:setSortComparatorClass<br>
* setOutputValueGroupingComparator,0.20以后为:setGroupingComparatorClass<br>
以上是关于hadoop MapReduce之辅助排序的主要内容,如果未能解决你的问题,请参考以下文章
大数据之Hadoop(MapReduce):Partition之WritableComparable排序
大数据之Hadoop(MapReduce):WritableComparable排序案例实操(全排序)
Hadoop MapReduce编程 API入门系列之二次排序