MR案例实现小练习
Posted Vics异地我就
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MR案例实现小练习相关的知识,希望对你有一定的参考价值。
MR案例实现
题目:现有一张emp表,字段分别为
员工编号,员工姓名,工作,管理编号,生日,工资,备注,部门编号
数据:
7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/7/13,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/7/13,1100,,20
7783,BOB,ANALYST,7777,1983/7/13,3200,,10
7822,BING,PRESIDENT,,2001/12/17,3000,,20
1233,CINFG,PRESIDENT,,2001/12/17,4000,,30
1233,FFSFG,PRESIDENT,999999,2001/12/17,4000,999999,10
2312,SDA,CLERK,3422,1987/7/13,2222,,30
4353,DFDS,CLERK,4563,1987/7/13,3111,999999,20
4564,RTEW,CLERK,5645,1987/7/13,6753,,20
7783,WOOP,ANALYST,7777,1983/7/13,5500,,10
5675,COC,ANALYST,7777,1983/7/13,6750,,30
3222,DOD,ANALYST,3422,1983/7/13,8400,,20,12
3211,EOE,ANALYST,7777,1983/7/13,2500,,10,33
目标:
根据工作类型(job)进行分区,
分区之下对每个员工按照部门(deptno)进行分组,
分组内部保证工资(sal)是降序。
每个分区中拿到每个部门工资排名第二的人(no.2)的信息。
注意:
请注意字段个数(只获取分割后数组大小为8的)
数据中99999为无效数据,请去除。
最终效果:
三个文件:
以下为代码段 Mapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Map_05 extends Mapper<LongWritable, Text, bean_05, NullWritable>
bean_05 k = new bean_05();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
//读取文件内容并以“,”进行分割
String[] strings = value.toString().split(",");
//根据题目要求,排除字段数量不为8且倒数第二字段为999999的数据,
if (strings.length == 8 && !strings[strings.length - 2].equals("999999"))
//根据要求字段类型存入K对应的值。
k.set(strings[1],strings[7],strings[0],
strings[2],strings[3],strings[4],
Integer.parseInt(strings[5]),strings[6]);
//让MAP进行输出
context.write(k, NullWritable.get());
Reducer 代码段
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Reducer_05 extends Reducer<bean_05, NullWritable, bean_05, NullWritable>
bean_05 k = new bean_05();
@Override
protected void reduce(bean_05 key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException
//定义一个常量初始化数值为0
int a = 0;
for (NullWritable value : values)
//执行如果a值为1则打印输出, 不为1的不打印进而选出二条数据
if (a==1)
context.write(key, NullWritable.get());
else a++;
分组代码段
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class fenzu_05 extends WritableComparator
public fenzu_05()
super(bean_05.class,true);
@Override
public int compare(WritableComparable a, WritableComparable b)
bean_05 a1= (bean_05) a ;
bean_05 b1= (bean_05) b ;
//利用get方法,判断Deptno是否相等,相等返回0,不等返回1即可;
//这样可以把Deptno一样的分在一个组里。
if (a1.getDeptno().equals(b1.getDeptno()))
return 0;
else return 1;
分区代码段
package MapReduce_05;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class fenqu_05 extends Partitioner<bean_05, NullWritable>
@Override
public int getPartition(bean_05 bean_05, NullWritable nullWritable, int i)
//利用Switch 判断字段是否相等,从而产生分区效果,让job字段相同的去同一个分区;
switch (bean_05.getJob())
case "ANALYST":
return 0;
case "CLERK":
return 1;
case "PRESIDENT":
return 2;
case "SALESMAN":
return 3;
case "MANAGER":
return 4;
default:
return 5;
Bean 代码段
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class bean_05 implements WritableComparable<bean_05>
//定义变量
private String ename;
private String deptno;
private String empno;
private String job;
private String mgr;
private String hiredate;
private int sal;
private String com;
public bean_05(String ename, String deptno, String empno, String job, String mgr, String hiredate, int sal, String com)
this.ename = ename;
this.deptno = deptno;
this.empno = empno;
this.job = job;
this.mgr = mgr;
this.hiredate = hiredate;
this.sal = sal;
this.com = com;
public void set(String ename, String deptno,
String empno, String job, String mgr,
String hiredate, int sal, String com)
this.ename = ename;
this.deptno = deptno;
this.empno = empno;
this.job = job;
this.mgr = mgr;
this.hiredate = hiredate;
this.sal = sal;
this.com = com;
public bean_05()
public String getEname()
return ename;
public void setEname(String ename)
this.ename = ename;
public String getDeptno()
return deptno;
public void setDeptno(String deptno)
this.deptno = deptno;
public String getEmpno()
return empno;
public void setEmpno(String empno)
this.empno = empno;
public String getJob()
return job;
public void setJob(String job)
this.job = job;
public String getMgr()
return mgr;
public void setMgr(String mgr)
this.mgr = mgr;
public String getHiredate()
return hiredate;
public void setHiredate(String hiredate)
this.hiredate = hiredate;
public int getSal()
return sal;
public void setSal(int sal)
this.sal = sal;
public String getCom()
return com;
public void setCom(String com)
this.com = com;
//重新定义比较方法题设要求根据薪水降序排列,因此需要重写。
//本目的找出deptno相同组下的sal进行排序,不同组不排序。
//因此仅当 compareto 方法返回值为0时才进行下一步排序;
@Override
public int compareTo(bean_05 o)
int a;
if (this.deptno.compareTo(o.getdeptno) == 0)
a = this.sal > o.getSal() ? -1 : (this.sal < o.getSal() ? 1 : 0);
else a = this.deptno.compareTo(o.deptno);
return a;
//序列化与反序列化
@Override
public void write(DataOutput dataOutput) throws IOException
dataOutput.writeUTF(empno);
dataOutput.writeUTF(ename);
dataOutput.writeUTF(job);
dataOutput.writeUTF(mgr);
dataOutput.writeUTF(hiredate);
dataOutput.writeInt(sal);
dataOutput.writeUTF(com);
dataOutput.writeUTF(deptno);
@Override
public void readFields(DataInput in) throws IOException
empno = in.readUTF();
ename = in.readUTF();
job = in.readUTF();
mgr = in.readUTF();
hiredate = in.readUTF();
sal = in.readInt();
com = in.readUTF();
deptno = in.readUTF();
//重写tostring方法 避免地址打印
@Override
public String toString()
return "bean_05" +
"ename='" + ename + '\\'' +
", deptno='" + deptno + '\\'' +
", empno='" + empno + '\\'' +
", job='" + job + '\\'' +
", mgr='" + mgr + '\\'' +
", hiredate='" + hiredate + '\\'' +
", sal=" + sal +
", com='" + com + '\\'' +
'';
Runner 代码段
import Mapreducer.delete;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;
public class Runner_05
public static void main(String[] args) throws Exception
//设定conf 读取配置文件
Configuration conf = new Configuration();
//设置本地运行
conf.set("mapreduce.framework.name","local");
//创建job对象
Job job = Job.getInstance(conf);
//设置job的jar包
job.setJarByClass(Runner_05.class);
job.setMapperClass(Map_05.class);
job.setReducerClass(Reducer_05.class);
//设置map输出对象
job.setMapOutputKeyClass(bean_05.class);
job.setMapOutputValueClass(NullWritable.class);
//设置reduce输出对象
job.setOutputKeyClass(bean_05.class);
job.setOutputValueClass(NullWritable.class);
//设置分区
job.setPartitionerClass(fenqu_05.class);
//设置task个数
job.setNumReduceTasks(6);
//设置分组
job.setGroupingComparatorClass(fenzu_05.class);
//配置输入输出路径
String in = "D:\\\\in";
String out = "D:\\\\out";
FileInputFormat.setInputPaths(job, new Path(in));
//检测文件夹是否存在如果存在即删除
delete.deleteFile(new File(out));
FileOutputFormat.setOutputPath(job, new Path(out));
//向集群提交并等待结束
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
以上是关于MR案例实现小练习的主要内容,如果未能解决你的问题,请参考以下文章