YARN集群的mapreduce测试

Posted 淼淼之森

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了YARN集群的mapreduce测试相关的知识,希望对你有一定的参考价值。

将手机用户使用流量的数据进行分组,排序;

 

测试准备:

首先同步时间,然后master先开启hdfs集群,再开启yarn集群;用jps查看:

master上: 先有NameNode、SecondaryNameNode;再有ResourceManager;

slave上:   先有DataNode;再有NodeManager;

如果master启动hdfs和yarn成功,但是slave节点有的不成功,则可以使用如下命令手动启动: 

hadoop-daemon.sh start datanode
yarn-daemon.sh start nodemanager

然后在本地"/home/hadoop/test/"目录创建phoneflow文件夹,将所有需要统计的数据放到该文件夹下;

 

测试目标:

 目标一:输出结果是:按手机号分组后,按照上传流量和下载流量的总和排序的结果;

 目标二:输出结果是:按手机号分组后,先按照上传流量排序,遇到相同时再按照上传流量和下载流量的总和排序;

 

测试代码

目标一:

因为涉及到了排序,我们输出的结果是一个包装好的flow对象(它自身就包含了很多信息);

分组必须必须要让flow类实现Serializable接口;

排序就必须要让flow类在分组的基础上再实现WritableComparable接口,并且重写write、readFields方法和重写compareTo方法; 

 1 package com.mmzs.bigdata.yarn.mapreduce;
 2 
 3 import java.io.DataInput;
 4 import java.io.DataOutput;
 5 import java.io.IOException;
 6 import java.io.Serializable;
 7 
 8 import org.apache.hadoop.io.WritableComparable;
 9 
10 public class Flow implements WritableComparable<Flow>,Serializable{
11 
12     private String phoneNum;//手机号
13     private Long upFlow;    //上传流量
14     private Long downFlow;  //下载流量
15     public Flow() {}
16     public Flow(String phoneNum, Long upFlow, Long downFlow) {
17         super();
18         this.phoneNum = phoneNum;
19         this.upFlow = upFlow;
20         this.downFlow = downFlow;
21     }
22     public Long getTotalFlow() {
23         return upFlow+downFlow;
24     }
25 
26     
27     //按照怎样的顺序写入到reduce中,在reduce中就按照怎样的顺序读
28     //write是一个序列化的过程
29     @Override
30     public void write(DataOutput out) throws IOException {
31         out.writeUTF(phoneNum);
32         out.writeLong(upFlow);
33         out.writeLong(downFlow);
34     }
35     //read是一个反序列化的过程
36     @Override
37     public void readFields(DataInput in) throws IOException {
38         this.phoneNum = in.readUTF();
39         this.upFlow = in.readLong();
40         this.downFlow = in.readLong();
41     }
42     //reduce任务排序的依据
43     @Override
44     public int compareTo(Flow flow) {
45         Long curTotalFlow = this.getTotalFlow();
46         Long paramTotalFlow = flow.getTotalFlow();
47         Long resFlow = curTotalFlow-paramTotalFlow;
48         return resFlow>0?-1:1;
49     }
50     
51     
52     public String getPhoneNum() {
53         return phoneNum;
54     }
55     public void setPhoneNum(String phoneNum) {
56         this.phoneNum = phoneNum;
57     }
58     public Long getUpFlow() {
59         return upFlow;
60     }
61     public void setUpFlow(Long upFlow) {
62         this.upFlow = upFlow;
63     }
64     public Long getDownFlow() {
65         return downFlow;
66     }
67     public void setDownFlow(Long downFlow) {
68         this.downFlow = downFlow;
69     }
70     //此方法只是单纯的为了方便一次性设置值,只set一次
71     public void setFlow(String phoneNum, Long upFlow, Long downFlow) {
72         this.phoneNum = phoneNum;
73         this.upFlow = upFlow;
74         this.downFlow = downFlow;
75     }
76     @Override
77     public String toString() {
78         return new StringBuilder(phoneNum).append("\\t")
79                 .append(upFlow).append("\\t")
80                 .append(downFlow).append("\\t")
81                 .append(getTotalFlow())
82                 .toString();
83     }
84 
85 }
Flow
 1 package com.mmzs.bigdata.yarn.mapreduce;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.io.LongWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Mapper;
 8 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 9 
10 public class PhoneFlowMapper01 extends Mapper<LongWritable, Text, Text, Flow> {
11     
12     private Text outKey;
13     private Flow outValue;
14     
15     @Override
16     protected void setup(Mapper<LongWritable, Text, Text, Flow>.Context context)
17             throws IOException, InterruptedException {
18         outKey = new Text();
19         outValue = new Flow();
20     }
21 
22     @Override
23     protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Flow>.Context context)
24             throws IOException, InterruptedException {
25         
26         String line = value.toString();
27         String[] fields = line.split("\\\\s+");
28         
29         //过滤无效不完整的数据
30         if(fields.length<3) return;
31         
32         String phoneNum = fields[0];
33         String upFlow = fields[1];
34         String downFlow = fields[2];
35         
36         outKey.set(phoneNum);
37         outValue.setFlow(phoneNum, Long.parseLong(upFlow), Long.parseLong(downFlow));;
38         context.write(outKey, outValue);
39         
40         
41     }
42     
43     @Override
44     protected void cleanup(Mapper<LongWritable, Text, Text, Flow>.Context context)
45             throws IOException, InterruptedException {
46         outKey = null;
47         outValue = null;
48     }
49     
50 }
PhoneFlowMapper01
 1 package com.mmzs.bigdata.yarn.mapreduce;
 2 
 3 import java.io.IOException;
 4 import java.util.ArrayList;
 5 import java.util.Iterator;
 6 import java.util.List;
 7 
 8 import org.apache.hadoop.io.NullWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Reducer;
11 
12 public class PhoneFlowReducer01 extends Reducer<Text, Flow, NullWritable, Flow> {
13     
14     private NullWritable outKey;
15     private Flow outValue;
16     
17     @Override
18     protected void setup(Reducer<Text, Flow, NullWritable, Flow>.Context context) throws IOException, InterruptedException {
19         outKey = NullWritable.get();
20         outValue = new Flow();
21     }
22 
23     @Override
24     protected void reduce(Text key, Iterable<Flow> values, Reducer<Text, Flow, NullWritable, Flow>.Context context)
25             throws IOException, InterruptedException {
26         Iterator<Flow> its = values.iterator();
27         
28         Long totalUpFlow = 0L;//此处是Long类型,不能设置成null;
29         Long totalDownFlow = 0L;
30         while (its.hasNext()) {
31             Flow flow = its.next();
32             totalUpFlow += flow.getUpFlow();//求和千万别忘记+号
33             totalDownFlow += flow.getDownFlow();
34         }
35         
36         outValue.setFlow(key.toString(), totalUpFlow, totalDownFlow);
37         context.write(outKey, outValue);
38         
39     }
40     
41     @Override
42     protected void cleanup(Reducer<Text, Flow, NullWritable, Flow>.Context context) throws IOException, InterruptedException {
43         outValue = null;
44     }
45 
46 
47 }
PhoneFlowReducer01
 1 package com.mmzs.bigdata.yarn.mapreduce;
 2 
 3 import java.io.IOException;
 4 import java.net.URI;
 5 import java.net.URISyntaxException;
 6 
 7 import org.apache.hadoop.conf.Configuration;
 8 import org.apache.hadoop.fs.FileSystem;
 9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.LongWritable;
11 import org.apache.hadoop.io.NullWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.mapreduce.Job;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16 
17 /**
18  * @author hadoop
19  *
20  */
21 public class PhoneFlowDriver01 {
22 
23     private static FileSystem fs;
24     private static Configuration conf;
25     static {
26         String uri = "hdfs://master01:9000/";
27         conf = new Configuration();
28         try {
29             fs = FileSystem.get(new URI(uri), conf, "hadoop");
30         } catch (IOException e) {
31             e.printStackTrace();
32         } catch (InterruptedException e) {
33             e.printStackTrace();
34         } catch (URISyntaxException e) {
35             e.printStackTrace();
36         }
37     }
38     
39     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
40 
41         Job pfJob01 = getJob(args);
42         if (null == pfJob01) {
43             return;
44         }
45         //提交Job到集群并等待Job运行完成,参数true表示将Job运行时的状态信息返回到客户端
46         boolean flag = false;
47         flag = pfJob01.waitForCompletion(true);
48         System.exit(flag?0:1);
49     }
50     
51     /**
52      * 获取Job实例
53      * @param args
54      * @return
55      * @throws IOException
56      */
57     public static Job getJob(String[] args) throws IOException {
58         if (null==args || args.length<2) return null;
59         //放置需要处理的数据所在的HDFS路径
60         Path inputPath = new Path(args[0]);
61         //放置Job作业执行完成之后其处理结果的输出路径
62         Path outputPath = new Path(args[1]);
63         
64         //获取Job实例
65         Job pfJob01 = Job.getInstance(conf, "pfJob0102");
66         //设置运行此jar包入口类
67         //pfJob01的入口是WordCountDriver类
68         pfJob01.setJarByClass(PhoneFlowDriver01.class);
69         //设置Job调用的Mapper类
70         pfJob01.setMapperClass(PhoneFlowMapper01.class);
71         //设置Job调用的Reducer类(如果一个Job没有Reducer则可以不调用此条语句)
72         pfJob01.setReducerClass(PhoneFlowReducer01.class);
73         
74         //设置MapTask的输出键类型
75         pfJob01.setMapOutputKeyClass(Text.class);
76         //设置MapTask的输出值类型
77         pfJob01.setMapOutputValueClass(Flow.class);
78         
79         //设置整个Job的输出键类型(如果一个Job没有Reducer则可以不调用此条语句)
80         pfJob01.setOutputKeyClass(NullWritable.class);
81         //设置整个Job的输出值类型(如果一个Job没有Reducer则可以不调用此条语句)
82         pfJob01.setOutputValueClass(Flow.class);
83         
84         //设置整个Job需要处理数据的输入路径
85         FileInputFormat.setInputPaths(pfJob01, inputPath);
86         //设置整个Job计算结果的输出路径
87         FileOutputFormat.setOutputPath(pfJob01, outputPath);
88         
89         return pfJob01;
90     }
91     
92 }
PhoneFlowDriver01
 1 package com.mmzs.bigdata.yarn.mapreduce;
 2 
 3 // import java.io.IOException;
 4 // import org.apache.hadoop.io.LongWritable;
 5 // import org.apache.hadoop.io.NullWritable;
 6 // import org.apache.hadoop.io.Text;
 7 // import org.apache.hadoop.mapreduce.Mapper;
 8 // import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 9 
10 // public class PhoneFlowMapper02 extends Mapper<LongWritable, Text, Flow, NullWritable> {
11     
12     // private Flow outKey;
13     // private NullWritable outValue;
14     
15     // @Override
16     // protected void setup(Mapper<LongWritable, Text, Flow, NullWritable>.Context context)
17             // throws IOException, InterruptedException {
18         // outKey = new Flow();
19         // outValue = NullWritable.get();
20     // }
21 
22     // @Override
23     // protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Flow, NullWritable>.Context context)
24             // throws IOException, InterruptedException {
25         
26         // String line = value.toString();
27         // String[] fields = line.split("\\\\s+");
28         
29         // String phoneNum = fields[0];
30         // String upFlow = fields[1];
31         // String downFlow = fields[2];
32         
33         因为获取过来的都是String类型,所以需要转换参数类型
34         // outKey.setFlow(phoneNum, Long.parseLong(upFlow), Long.parseLong(downFlow));;
35         // context.write(outKey, outValue);
36         
37     // }
38     
39     // @Override
40     // protected void cleanup(Mapper<LongWritable, Text, Flow, NullWritable>.Context context)
41             // throws IOException, InterruptedException {
42         // outKey = null;
43         // outValue = null;
44     // }
45     
46 // }
PhoneFlowMapper02
 1 package com.mmzs.bigdata.yarn.mapreduce;
 2 
 3 import java.io.IOException;
 4 import java.util.ArrayList;
 5 import java.util.Iterator;
 6 import java.util.List;
 7 
 8 import org.apache.hadoop.io.NullWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Reducer;
11 
12 public class PhoneFlowReducer02 extends Reducer<Flow, NullWritable, Flow, NullWritable> {
13     
14     private NullWritable outValue;
15     
16     @Override
17     protected void setup(Reducer<Flow, NullWritable, Flow, NullWritable>.Context context) throws IOException, InterruptedException {
18         outValue = NullWritable.get();
19     }
20 
21     @Override
22     protected void reduce(Flow key, Iterable<NullWritable> values, Reducer<Flow, NullWritable, Flow, NullWritable>.Context context)
23             throws IOException, InterruptedException {
24         //此reduce不能少,它会自动调用compareTo方法进行排序
25         //排序的工作是在shuffle的工程中进行的
26         context.write(key, outValue);
27     }
28     
29     @Override
30     protected void cleanup(Reducer<Flow, NullWritable, Flow, NullWritable>.Context context) throws IOException, InterruptedException {
31         outValue = null;
32     }
33 
34 
35 }
PhoneFlowReducer02
 1 package com.mmzs.bigdata.yarn.mapreduce;
 2 
 3 import java.io.IOException;
 4 import java.net.URI;
 5 import java.net.URISyntaxException;
 6 
 7 import org.apache.hadoop.conf.Configuration;
 8 import org.apache.hadoop.fs.FileSystem;
 9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.LongWritable;
11 import org.apache.hadoop.io.NullWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.mapreduce.Job;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16 
17 /**
18  * @author hadoop
19  *
20  */
21 public class PhoneFlowDriver02 {
22 
23     private static FileSystem fs;
24     private static Configuration conf;
25     static {
26         String uri = "hdfs://master01:9000/";
27         conf = new Configuration();
28         try {
29             fs = FileSystem.get(new URI(uri), conf, "hadoop");
30         } catch (IOException e) {
31 Mapreduce提交YARN集群运行

大数据学习之MapReduce基础与Yarn集群安装09

YARN入门

解析Hadoop新一代MapReduce框架Yarn

新手必备:Hadoop基础——YARN认知

Windows平台开发Mapreduce程序远程调用运行在Hadoop集群—Yarn调度引擎异常