2018-07-30期 MapReduce对象排序(单列排序)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2018-07-30期 MapReduce对象排序(单列排序)相关的知识,希望对你有一定的参考价值。
1、EmpSalaryBean1 对象类
package cn.sjq.mr.sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
* 定义一个员工薪水的JavaBean,并实现MapReduce的WritableComparable<EmpSalaryBean>序列化及比较器接口
* @author songjq
*
*/
public class EmpSalaryBean1 implements WritableComparable<EmpSalaryBean1> {
/*
定义成员属性
c_oid
c_employee_name
c_second_company_name
c_third_company_name
c_fourth_company_name
c_company_name
c_dept_name
c_sub_total
c_com_fund_payamt
*/
private int seq;
private String c_oid;
private String c_employee_name;
private String c_second_company_name;
private String c_third_company_name;
private String c_fourth_company_name;
private String c_company_name;
private String c_dept_name;
private float c_sub_total;
private float c_com_fund_payamt;
public int getSeq() {
return seq;
}
public void setSeq(int seq) {
this.seq = seq;
}
public String getC_oid() {
return c_oid;
}
public void setC_oid(String c_oid) {
this.c_oid = c_oid;
}
public String getC_employee_name() {
return c_employee_name;
}
public void setC_employee_name(String c_employee_name) {
this.c_employee_name = c_employee_name;
}
public String getC_second_company_name() {
return c_second_company_name;
}
public void setC_second_company_name(String c_second_company_name) {
this.c_second_company_name = c_second_company_name;
}
public String getC_third_company_name() {
return c_third_company_name;
}
public void setC_third_company_name(String c_third_company_name) {
this.c_third_company_name = c_third_company_name;
}
public String getC_fourth_company_name() {
return c_fourth_company_name;
}
public void setC_fourth_company_name(String c_fourth_company_name) {
this.c_fourth_company_name = c_fourth_company_name;
}
public String getC_company_name() {
return c_company_name;
}
public void setC_company_name(String c_company_name) {
this.c_company_name = c_company_name;
}
public String getC_dept_name() {
return c_dept_name;
}
public void setC_dept_name(String c_dept_name) {
this.c_dept_name = c_dept_name;
}
public float getC_sub_total() {
return c_sub_total;
}
public void setC_sub_total(float c_sub_total) {
this.c_sub_total = c_sub_total;
}
public float getC_com_fund_payamt() {
return c_com_fund_payamt;
}
public void setC_com_fund_payamt(float c_com_fund_payamt) {
this.c_com_fund_payamt = c_com_fund_payamt;
}
//方序列化方法
@Override
public void readFields(DataInput in) throws IOException {
this.seq = in.readInt();
this.c_oid = in.readUTF();
this.c_employee_name = in.readUTF();
this.c_second_company_name = in.readUTF();
this.c_third_company_name = in.readUTF();
this.c_fourth_company_name = in.readUTF();
this.c_company_name = in.readUTF();
this.c_dept_name = in.readUTF();
this.c_sub_total = in.readFloat();
this.c_com_fund_payamt = in.readFloat();
}
//序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.seq);
out.writeUTF(this.c_oid);
out.writeUTF(this.c_employee_name);
out.writeUTF(this.c_second_company_name);
out.writeUTF(this.c_third_company_name);
out.writeUTF(this.c_fourth_company_name);
out.writeUTF(this.c_company_name);
out.writeUTF(this.c_dept_name);
out.writeFloat(this.c_sub_total);
out.writeFloat(this.c_com_fund_payamt);
}
@Override
public String toString() {
return this.seq+" "+this.c_oid+" "+
this.c_employee_name+" "+this.c_second_company_name+" "+
this.c_third_company_name+" "+this.c_fourth_company_name+" "+
this.c_company_name+" "+this.c_dept_name+" "+
this.c_sub_total+" "+this.c_com_fund_payamt+" ";
}
/*
* 实现compareTo方法
* (non-Javadoc)
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
@Override
public int compareTo(EmpSalaryBean1 o) {
// 单列排序,这里根据部门名称c_dept_name进行升序排序,部门号为字符串
if (this.c_dept_name.compareTo(o.getC_dept_name()) >= 0) {
// 当前c_dept_name大于等于传入的c_dept_name,则返回1
return 1;
} else {
// 当前c_dept_name小于传入的c_dept_name,则返回-1
return -1;
}
}
}
2、ObjectSortSingleClumn实现类
package cn.sjq.mr.sort;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 利用MapReduce对EmpSalaryBean排序
* @author songjq
*
*/
public class ObjectSortSingleClumn {
/**
* 排序Mapper类
* 由于MapReduce排序主要发生在Key2,因此需要将EmpSalaryBean作为Key2的输出
* 这里使用mapper类即可,不需要写Reducer类
* @author songjq
*
*/
static class ObjectSortSingleClumnMapper extends Mapper<LongWritable, Text, EmpSalaryBean1, NullWritable>{
@Override
protected void map(LongWritable key1, Text value1,
Context context)
throws IOException, InterruptedException {
//获取一行
String line = value1.toString();
//分词
String[] fds = StringUtils.split(line, ",");
//将分词数据封装到EmpSalaryBean对象
EmpSalaryBean1 es = new EmpSalaryBean1();
es.setSeq(new Integer(fds[0]).intValue());
es.setC_oid(fds[1]);
es.setC_employee_name(fds[2]);
es.setC_second_company_name(fds[3]);
es.setC_third_company_name(fds[4]);
es.setC_fourth_company_name(fds[5]);
es.setC_company_name(fds[6]);
es.setC_dept_name(fds[7]);
es.setC_sub_total(new Float(fds[8]).floatValue());
es.setC_com_fund_payamt(new Float(fds[9]).floatValue());
//序列化输出到Reducer
context.write(es,NullWritable.get());
}
}
/**
* 提交job
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(ObjectSortSingleClumn.class);
job.setMapperClass(ObjectSortSingleClumnMapper.class);
job.setMapOutputKeyClass(EmpSalaryBean1.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(EmpSalaryBean1.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("D:\test\tmp\sort\empsalary.csv"));
FileOutputFormat.setOutputPath(job, new Path("D:\test\tmp\sort\out7"));
job.waitForCompletion(true);
}
}
以上是关于2018-07-30期 MapReduce对象排序(单列排序)的主要内容,如果未能解决你的问题,请参考以下文章
2018-07-29期 MapReduce实现对字符串进行排序
2018-08-02 期 MapReduce实现多表查询自连接