MapReduce使用Partitioner分区案例

Posted tiepihetao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce使用Partitioner分区案例相关的知识,希望对你有一定的参考价值。

Mapper:

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class EmployeeMapper  extends Mapper<LongWritable, Text, LongWritable, Employee> 

    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException 
        //7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
        String str = value.toString();
        //分词
        String[] words = str.split(",");

        Employee e = new Employee();
        e.setEmpno(Integer.parseInt(words[0]));
        e.setEname(words[1]);
        e.setJob(words[2]);
        try 
            e.setMgr(Integer.parseInt(words[3]));
         catch (Exception e2) 
            e.setMgr(0);
        
        e.setHiredate(words[4]);
        e.setSal(Integer.parseInt(words[5]));
        try 
            e.setComm(Integer.parseInt(words[6]));
         catch (Exception e2) 
            e.setComm(0);
        		
        e.setDeptno(Integer.parseInt(words[7]));

        //将这个员工输出
        context.write(new LongWritable(e.getDeptno()),e);
    
 
 
 
40
 
 
 
 
1
import java.io.IOException;
2
import org.apache.hadoop.io.LongWritable;
3
import org.apache.hadoop.io.NullWritable;
4
import org.apache.hadoop.io.Text;
5
import org.apache.hadoop.mapreduce.Mapper;
6
import org.apache.hadoop.mapreduce.Mapper.Context;
7

8
public class EmployeeMapper  extends Mapper<LongWritable, Text, LongWritable, Employee> 
9

10
    @Override
11
    protected void map(LongWritable key, Text value,Context context)
12
            throws IOException, InterruptedException 
13
        //7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
14
        String str = value.toString();
15
        //分词
16
        String[] words = str.split(",");
17

18
        Employee e = new Employee();
19
        e.setEmpno(Integer.parseInt(words[0]));
20
        e.setEname(words[1]);
21
        e.setJob(words[2]);
22
        try 
23
            e.setMgr(Integer.parseInt(words[3]));
24
         catch (Exception e2) 
25
            e.setMgr(0);
26
        
27
        e.setHiredate(words[4]);
28
        e.setSal(Integer.parseInt(words[5]));
29
        try 
30
            e.setComm(Integer.parseInt(words[6]));
31
         catch (Exception e2) 
32
            e.setComm(0);
33
        
34
        e.setDeptno(Integer.parseInt(words[7]));
35

36
        //将这个员工输出
37
        context.write(new LongWritable(e.getDeptno()),e);
38
    
39
 
 

Reducer:

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class EmployeeReducer extends Reducer<LongWritable, Employee, LongWritable, Employee> 

    @Override
    protected void reduce(LongWritable deptno, Iterable<Employee> values,Context context)
            throws IOException, InterruptedException 
        for(Employee e:values)
            context.write(deptno, e);
        
    
 
 
 
16
 
 
 
 
 
1
import java.io.IOException;
2
import org.apache.hadoop.io.LongWritable;
3
import org.apache.hadoop.mapreduce.Reducer;
4

5
public class EmployeeReducer extends Reducer<LongWritable, Employee, LongWritable, Employee> 
6

7
    @Override
8
    protected void reduce(LongWritable deptno, Iterable<Employee> values,Context context)
9
            throws IOException, InterruptedException 
10
        for(Employee e:values)
11
            context.write(deptno, e);
12
        
13
    
14
 
 

Employee:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

public class Employee implements Writable
    private int empno;
    private String ename;
    private String job;
    private int mgr;
    private String hiredate;
    private int sal;
    private int comm;
    private int deptno;

    public Employee()

    

    @Override
    public String toString() 
        return "Employee [empno=" + empno + ", ename=" + ename + ", job=" + job
                + ", mgr=" + mgr + ", hiredate=" + hiredate + ", sal=" + sal
                + ", comm=" + comm + ", deptno=" + deptno + "]";
    

    @Override
    public void readFields(DataInput in) throws IOException 
        this.empno = in.readInt();
        this.ename = in.readUTF();
        this.job = in.readUTF();
        this.mgr = in.readInt();
        this.hiredate = in.readUTF();
        this.sal = in.readInt();
        this.comm = in.readInt();
        this.deptno = in.readInt();
    

    @Override
    public void write(DataOutput output) throws IOException 
        ////7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
        output.writeInt(empno);
        output.writeUTF(ename);
        output.writeUTF(job);
        output.writeInt(mgr);
        output.writeUTF(hiredate);
        output.writeInt(sal);
        output.writeInt(comm);
        output.writeInt(deptno);
    

    public int getEmpno() 
        return empno;
    

    public void setEmpno(int empno) 
        this.empno = empno;
    

    public String getEname() 
        return ename;
    

    public void setEname(String ename) 
        this.ename = ename;
    

    public String getJob() 
        return job;
    

    public void setJob(String job) 
        this.job = job;
    

    public int getMgr() 
        return mgr;
    

    public void setMgr(int mgr) 
        this.mgr = mgr;
    

    public String getHiredate() 
        return hiredate;
    

    public void setHiredate(String hiredate) 
        this.hiredate = hiredate;
    

    public int getSal() 
        return sal;
    

    public void setSal(int sal) 
        this.sal = sal;
    

    public int getComm() 
        return comm;
    

    public void setComm(int comm) 
        this.comm = comm;
    

    public int getDeptno() 
        return deptno;
    

    public void setDeptno(int deptno) 
        this.deptno = deptno;
    
 
 
 
x
 
 
 
 
1
import java.io.DataInput;
2
import java.io.DataOutput;
3
import java.io.IOException;
4
import org.apache.hadoop.io.Writable;
5
import org.apache.hadoop.io.WritableComparable;
6

7
public class Employee implements Writable
8
    private int empno;
9
    private String ename;
10
    private String job;
11
    private int mgr;
12
    private String hiredate;
13
    private int sal;
14
    private int comm;
15
    private int deptno;
16

17
    public Employee()
18

19
    
20

21
    @Override
22
    public String toString() 
23
        return "Employee [empno=" + empno + ", ename=" + ename + ", job=" + job
24
                + ", mgr=" + mgr + ", hiredate=" + hiredate + ", sal=" + sal
25
                + ", comm=" + comm + ", deptno=" + deptno + "]";
26
    
27

28
    @Override
29
    public void readFields(DataInput in) throws IOException 
30
        this.empno = in.readInt();
31
        this.ename = in.readUTF();
32
        this.job = in.readUTF();
33
        this.mgr = in.readInt();
34
        this.hiredate = in.readUTF();
35
        this.sal = in.readInt();
36
        this.comm = in.readInt();
37
        this.deptno = in.readInt();
38
    
39

40
    @Override
41
    public void write(DataOutput output) throws IOException 
42
        ////7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
43
        output.writeInt(empno);
44
        output.writeUTF(ename);
45
        output.writeUTF(job);
46
        output.writeInt(mgr);
47
        output.writeUTF(hiredate);
48
        output.writeInt(sal);
49
        output.writeInt(comm);
50
        output.writeInt(deptno);
51
    
52

53
    public int getEmpno() 
54
        return empno;
55
    
56

57
    public void setEmpno(int empno) 
58
        this.empno = empno;
59
    
60

61
    public String getEname() 
62
        return ename;
63
    
64

65
    public void setEname(String ename) 
66
        this.ename = ename;
67
    
68

69
    public String getJob() 
70
        return job;
71
    
72

73
    public void setJob(String job) 
74
        this.job = job;
75
    
76

77
    public int getMgr() 
78
        return mgr;
79
    
80

81
    public void setMgr(int mgr) 
82
        this.mgr = mgr;
83
    
84

85
    public String getHiredate() 
86
        return hiredate;
87
    
88

89
    public void setHiredate(String hiredate) 
90
        this.hiredate = hiredate;
91
    
92

93
    public int getSal() 
94
        return sal;
95
    
96

97
    public void setSal(int sal) 
98
        this.sal = sal;
99
    
100

101
    public int getComm() 
102
        return comm;
103
    
104

105
    public void setComm(int comm) 
106
        this.comm = comm;
107
    
108

109
    public int getDeptno() 
110
        return deptno;
111
    
112

113
    public void setDeptno(int deptno) 
114
        this.deptno = deptno;
115
    
116
 
 

Partitioner:

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class EmployeePartition extends Partitioner<LongWritable, Employee> 

    @Override
    public int getPartition(LongWritable key2, Employee e, int numPartition) 
        // 分区的规则
        if(e.getDeptno() == 10)
            return 1%numPartition;
        else if(e.getDeptno() == 20)
            return 2%numPartition;
        else
            return 3%numPartition;
        
    
 
 
 
17
 
 
 
 
 
1
import org.apache.hadoop.io.LongWritable;
2
import org.apache.hadoop.mapreduce.Partitioner;
3

4
public class EmployeePartition extends Partitioner<LongWritable, Employee> 
5

6
    @Override
7
    public int getPartition(LongWritable key2, Employee e, int numPartition) 
8
        // 分区的规则
9
        if(e.getDeptno() == 10)
10
            return 1%numPartition;
11
        else if(e.getDeptno() == 20)
12
            return 2%numPartition;
13
        else
14
            return 3%numPartition;
15
        
16
    
17
 
 

Driver:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PartitionMain 

    public static void main(String[] args) throws Exception 
        // 求员工工资的总额
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //指明程序的入口
        job.setJarByClass(PartitionMain.class);

        //指明任务中的mapper
        job.setMapperClass(EmployeeMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Employee.class);

        //设置分区的规则
        job.setPartitionerClass(EmployeePartition.class);
        job.setNumReduceTasks(3);

        job.setReducerClass(EmployeeReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Employee.class);

        //指明任务的输入路径和输出路径	---> HDFS的路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //启动任务
        job.waitForCompletion(true);
    
 
 
 
38
 
 
 
 
 
1
import org.apache.hadoop.conf.Configuration;
2
import org.apache.hadoop.fs.Path;
3
import org.apache.hadoop.io.LongWritable;
4
import org.apache.hadoop.io.NullWritable;
5
import org.apache.hadoop.mapreduce.Job;
6
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
7
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
8
public class PartitionMain 
9

10
    public static void main(String[] args) throws Exception 
11
        // 求员工工资的总额
12
        Configuration conf = new Configuration();
13
        Job job = Job.getInstance(conf);
14

15
        //指明程序的入口
16
        job.setJarByClass(PartitionMain.class);
17

18
        //指明任务中的mapper
19
        job.setMapperClass(EmployeeMapper.class);
20
        job.setMapOutputKeyClass(LongWritable.class);
21
        job.setMapOutputValueClass(Employee.class);
22

23
        //设置分区的规则
24
        job.setPartitionerClass(EmployeePartition.class);
25
        job.setNumReduceTasks(3);
26

27
        job.setReducerClass(EmployeeReducer.class);
28
        job.setOutputKeyClass(LongWritable.class);
29
        job.setOutputValueClass(Employee.class);
30

31
        //指明任务的输入路径和输出路径---> HDFS的路径
32
        FileInputFormat.addInputPath(job, new Path(args[0]));
33
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
34

35
        //启动任务
36
        job.waitForCompletion(true);
37
    
38
 
 
 
 

 



以上是关于MapReduce使用Partitioner分区案例的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce的分区

MapReduce之自定义Partitioner

2018-07-30期 MapReduce分区(Partitioner)编程案例

MapReduce中的分区方法Partitioner

Hadoop中的MapReduce框架原理自定义Partitioner步骤在Job驱动中,设置自定义PartitionerPartition 分区案例

Hadoop自定义分区Partitioner