Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2

Posted 大数据和人工智能躺过的坑

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2相关的知识,希望对你有一定的参考价值。

 

 

   下面,是版本1。

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本1(一)

 

 

 

 

   这篇博文,包括了,实际生产开发非常重要的,单元测试和调试代码。这里不多赘述,直接送上代码。

 

MRUnit 框架

MRUnitCloudera公司专为Hadoop MapReduce写的单元测试框架,API非常简洁实用。MRUnit针对不同测试对象使用不同的Driver:

        MapDriver:针对单独的Map测试

        ReduceDriver:针对单独的Reduce测试

        MapReduceDriver:将map和reduce串起来测试

        PipelineMapReduceDriver:将多个MapReduce对串志来测试

 

   记得,将这个jar包,放到工程项目里。我这里是在工程项目的根目录下的lib下。

 

 

 

 

代码版本2

 

 

      编写TemperatureMapperTest.java的代码。  编译,出现以下,则说明无误。

 

   在test()方法中,withInput的key/value参数分别为偏移量和一行气象数据,其类型要与TemperatureMapper的输入类型一致即为LongWritable和Text。 withOutput的key/value参数分别是我们期望输出的new Text("03103")和new IntWritable(200),我们要达到的测试效果就是我们的期望输出结果与 TemperatureMapper 的实际输出结果一致。

     测试方法为 test() 方法,左边的对话框里显示"Runs:1/1,Errors:0,Failures:0",说明 Mapper 测试成功了。

 

 

 

 

 

    创建TemperatureReduceTest.java,来对Reduce进行测试。

     在test()方法中,withInput的key/value参数分别为new Text(key)和List类型的集合values。withOutput 的key/value参数分别是我们所期望输出的new Text(key)和new IntWritable(150),我们要达到的测试效果就是我们的期望输出结果与TemperatureReducer实际输出结果一致。

 

 编写TemperatureReduceTest.java的代码。  编译,出现以下,则说明无误。

        Reducer 端的单元测试,鼠标放在 TemperatureReduceTest 类上右击,选择 Run As ——> JUnit test,运行结果如下所示。

测试方法为 test() 方法,左边的对话框里显示"Runs:1/1,Errors:0,Failures:0",说明 Reducer 测试成功了。

 

 

 

 

 

 

 

 

 

 

 

 MapReduce 单元测试

        把 Mapper 和 Reducer 集成起来的测试案例代码如下。

      创建TemperatureTest.java,来进行测试。

  在 test() 方法中,withInput添加了两行测试数据line和line2,withOutput 的key/value参数分别为我们期望的输出结果new Text("03103")和new IntWritable(150)。我们要达到的测试效果就是我们期望的输出结果与Temperature实际的输出结果一致。

 

    编写TemperatureTest.java的代码。 编译,出现以下,则说明无误。

        Reducer 端的单元测试,鼠标放在 TemperatureTest.java类上右击,选择 Run As ——> JUnit test,运行结果如下所示。

    测试方法为 test() 方法,左边的对话框里显示"Runs:1/1,Errors:0,Failures:0",说明 MapReduce 测试成功了。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Temperature.java代码

  1 package zhouls.bigdata.myMapReduce.TemperatureTest;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.hadoop.io.IntWritable;
  6 import org.apache.hadoop.io.LongWritable;
  7 import org.apache.hadoop.io.Text;
  8 import org.apache.hadoop.mapreduce.Mapper;
  9 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 10 import org.apache.hadoop.conf.Configuration;
 11 import org.apache.hadoop.conf.Configured;
 12 import org.apache.hadoop.fs.FileSystem;
 13 import org.apache.hadoop.fs.Path;
 14 import org.apache.hadoop.io.IntWritable;
 15 import org.apache.hadoop.io.Text;
 16 import org.apache.hadoop.mapreduce.Job;
 17 import org.apache.hadoop.mapreduce.Mapper;
 18 import org.apache.hadoop.mapreduce.Reducer;
 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 20 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 22 import org.apache.hadoop.util.Tool;
 23 import org.apache.hadoop.util.ToolRunner;
 24 
 25 
 26 /*
 27 Hadoop内置的数据类型:
 28     BooleanWritable:标准布尔型数值
 29     ByteWritable:单字节数值
 30     DoubleWritable:双字节数值
 31     FloatWritable:浮点数
 32     IntWritable:整型数
 33     LongWritable:长整型数
 34     Text:使用UTF8格式存储的文本
 35     NullWritable:当<key, value>中的key或value为空时使用
 36 */
 37 
 38 
 39 /**
 40  * 统计美国每个气象站30年来的平均气温
 41  * 1、编写map()函数
 42  * 2、编写reduce()函数
 43  * 3、编写run()执行方法,负责运行MapReduce作业
 44  * 4、在main()方法中运行程序
 45  * 
 46  * @author zhouls
 47  *
 48  */
 49                         //继承Configured类,实现Tool接口
 50 public class Temperature extends Configured implements Tool{
 51     public static class TemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
 52                                 //输入的key,输入的value,输出的key,输出的value
 53         //输入的LongWritable键是某一行起始位置相对于文件起始位置的偏移量,不过我们不需要这个信息,所以将其忽略。
 54         
 55 //        在这种情况下,我们将气象站id按 Text 对象进行读/写(因为我们把气象站id当作键),将气温值封装在 IntWritale 类型中。只有气温数据不缺失,这些数据才会被写入输出记录中。
 56         
 57         
 58 //        map 函数的功能仅限于提取气象站和气温信息
 59         
 60         /**
 61          * @function Mapper 解析气象站数据
 62          * @input key=偏移量  value=气象站数据
 63          * @output key=weatherStationId value=temperature
 64          */
 65         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
 66                                 //map()函数还提供了context实例,用于键值对的输出  或者说 map() 方法还提供了 Context 实例用于输出内容的写入
 67             
 68 //                                就本示例来说,输入键是一个长整数偏移量,输入值是一行文本,输出键是气象站id,输出值是气温(整数)。
 69 //                同时context作为了map和reduce执行中各个函数的一个桥梁,这个设计和Java web中的session对象、application对象很相似
 70             
 71             
 72             //第一步,我们将每行气象站数据转换为每行的String类型
 73             String line = value.toString(); //每行气象数据
 74 //            values是1980 12 01 00    78   -17 10237   180    21     1     0     0
 75 //            line是"1980 12 01 00    78   -17 10237   180    21     1     0     0"
 76             
 77             
 78             //第二步:提取气温值
 79             int temperature = Integer.parseInt(line.substring(14, 19).trim());//每小时气温值
 80                                  //需要转换为整形,截取第14位到19位,从第0位开始,trim()的功能是去掉首尾空格。
 81                                 //substring()方法截取我们业务需要的值
 82             
 83 //            substring(start, stop)其内容是从 start 处到 stop-1 处的所有字符,其长度为 stop 减 start。
 84             
 85 //            如Hello world!    若是substring(3,7)        则是lo w
 86             
 87 //                                Integer.parseInt() 返回的是一个int的值。在这里, 给temperature
 88             
 89 //             new Integer.valueof()返回的是Integer的对象。 
 90 //             Integer.parseInt() 返回的是一个int的值。
 91 //             new Integer.valueof().intValue();返回的也是一个int的值。 
 92             
 93             
 94             
 95             
 96             
 97 //            1980 12 01 00    78   -17 10237   180    21     1     0     0
 98                             //78是气温值
 99             
100 //            temperature是78
101             
102 //            30yr_03103.dat
103 //            30yr_03812.dat
104 //            30yr_03813.dat
105 //            30yr_03816.dat
106 //            30yr_03820.dat
107 //            30yr_03822.dat
108 //            30yr_03856.dat
109 //            30yr_03860.dat
110 //            30yr_03870.dat
111 //            30yr_03872.dat
112             
113             
114 //            (0,1985 07 31 02   200    94 10137   220    26     1     0 -9999)
115 //            (62,1985 07 31 03   172    94 10142   240     0     0     0 -9999)
116 //            (124,1985 07 31 04   156    83 10148   260    10     0     0 -9999)
117 //            (186,1985 07 31 05   133    78 -9999   250     0 -9999     0 -9999)
118 //            (248,1985 07 31 06   122    72 -9999    90     0 -9999     0     0)
119 //            (310,1985 07 31 07   117    67 -9999    60     0 -9999     0 -9999)
120 //            (371,1985 07 31 08   111    61 -9999    90     0 -9999     0 -9999)
121 //            (434,1985 07 31 09   111    61 -9999    60     5 -9999     0 -9999)
122 //            (497,1985 07 31 10   106    67 -9999    80     0 -9999     0 -9999)
123 //            (560,1985 07 31 11   100    56 -9999    50     5 -9999     0 -9999)
124             
125 //            (03103,[200,172,156,133,122,117,111,111,106,100])
126             
127 //            根据自己业务需要 , map 函数的功能仅限于提取气象站和气温信息
128             
129             
130 //            1998        #year
131 //            03            #month
132 //            09            #day
133 //            17            #hour
134 //            11            #temperature            感兴趣
135 //            -100        #dew
136 //            10237        #pressure
137 //            60            #wind_direction    
138 //            72            #wind_speed
139 //            0            #sky_condition    
140 //            0            #rain_1h 
141 //            -9999        #rain_6h
142             
143             
144             if (temperature != -9999){//过滤无效数据    
145                 //第三步:提取气象站编号
146                 //获取输入分片
147                 FileSplit fileSplit = (FileSplit) context.getInputSplit();//提取问加你输入分片,并转换类型
148 //                    即由InputSplit   ->   FileSplit
149                 
150 //                context.getInputSplit()
151 //                (FileSplit) context.getInputSplit()这是强制转换
152 //                fileSplit的值是file:/D:/Code/MyEclipseJavaCode/myMapReduce/data/temperature/30yr_03870.dat:0+16357956
153 //                即,读的是30yr_03870.dat这个文件
154                 
155                 
156                 //然后通过文件名称提取气象站编号
157                 String weatherStationId = fileSplit.getPath().getName().substring(5, 10);//通过文件名称提取气象站id
158                         //首先通过文件分片fileSplit来获取文件路径,然后再获取文件名字,然后截取第5位到第10位就可以得到气象站 编号
159 //                        fileSplit.getPath()
160 //                        fileSplit.getPath().getName()
161                 
162 //                30yr_03870.dat   我们只需获取03870就是气象站编号
163 
164 //                        fileSplit.getPath().getName().substring(5, 10)   //从0开始,即第5个开始截取,到第10个为止,第10个没有拿到,所以为03870
165 //                weatherStationId是03870
166                 
167                 
168                 
169                 context.write(new Text(weatherStationId), new IntWritable(temperature));//写入weatherStationId是k2,temperature是v2
170 //                context.write(weatherStationId,temperature);等价    ,但是若是直接这样写会出错,因为,    weatherStationId是String类型,注意与Text类型还是有区别的!        
171                         //气象站编号,气温值
172             }
173         }
174     }
175 
176 
177     
178     public static class TemperatureReducer extends Reducer< Text, IntWritable, Text, IntWritable>{
179         private IntWritable result = new IntWritable();//存取结果
180                 //因为气温是IntWritable类型                       
181         public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ 
182 //            Iterable<IntWritable> values是iterable(迭代器)变量
183             
184             
185 //            Iterable<IntWritable> values和IntWritable values这样有什么区别?
186 //            前者是iterable(迭代器)变量,后者是intwriteable(int的封装)变量
187             
188             
189 //            Iterable<IntWritable> values
190 //            迭代器,valuses是 iterable(迭代器)变量,类型是IntWritable
191             
192             
193                         //reduce输出的key,key的集合,context的实例
194             //第一步:统计相同气象站的所有气温
195             int sum = 0;
196             int count = 0;
197             for (IntWritable val : values) //星型for循环来循环同一个气象站的所有气温值,即将values的值一一传给IntWritable val 
198 //                IntWritable val是IntWritable(int的封装)变量
199                 
200             {//对所有气温值累加
201             sum += val.get();//去val里拿一个值,就sum下
202 
203 //            val.get()去拿值
204             
205                 count++;
206             }
207             result.set(sum / count);//设为v3
208 //            result.set(sum / count)去设置,将sum / count的值,设给result
209 //            sum是21299119   count是258616  =  82.3580869  
210             
211             
212             context.write(key,result);//写入key是k3,result是v3
213         }
214     }
215 
216     
217     
218     public int run(String[] args) throws Exception{
219         // TODO Auto-generated method stub
220         //第一步:读取配置文件
221         Configuration conf = new Configuration();//程序里,只需写这么一句话,就会加载到hadoop的配置文件了
222         //Configuration类代表作业的配置,该类会加载mapred-site.xml、hdfs-site.xml、core-site.xml等配置文件。
223         
224 //                new Configuration()
225 
226         //第二步:输出路径存在就先删除
227         Path mypath = new Path(args[1]);//定义输出路径的Path对象,mypath
228         
229         
230 //        new Path(args[1])将args[1]的值,给mypath
231         
232         FileSystem hdfs = mypath.getFileSystem(conf);//程序里,只需写这么一句话,就可以获取到文件系统了。
233         //FileSystem里面包括很多系统,不局限于hdfs,是因为,程序读到conf,哦,原来是hadoop集群啊。这时,才认知到是hdfs
234         
235         if (hdfs.isDirectory(mypath))//如果输出路径存在
236         {
237             hdfs.delete(mypath, true);//则就删除
238         }
239         //第三步:构建job对象
240         Job job = new Job(conf, "temperature");//新建一个任务,job名字是tempreature
241         
242 //        new Job(conf, "temperature")有这么个构造方法
243         
244         job.setJarByClass(Temperature.class);// 设置主类
245         //通过job对象来设置主类Temperature.class
246         
247         //第四步:指定数据的输入路径和输出路径
248         FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径,args[0]
249         FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径,args[1]
250         
251         //第五步:指定Mapper和Reducer
252         job.setMapperClass(TemperatureMapper.class);// Mapper
253         job.setReducerClass(TemperatureReducer.class);// Reducer
254         
255         //第六步:设置map函数和reducer函数的输出类型
256         job.setOutputKeyClass(Text.class);
257         job.setOutputValueClass(IntWritable.class);    
258         
259         //第七步:提交作业
260         return job.waitForCompletion(true)?0:1;//提交任务
261     }
262 
263 
264     /**
265      * @function main 方法
266      * @param args
267      * @throws Exception
268      */
269     public static void main(String[] args) throws Exception {
270         //第一步
271 //        String[] args0 = 
272 //            {
273 //            "hdfs://djt002:9000/inputData/temperature/",
274 //                    "hdfs://djt002:9000/outData/temperature/"
275 //            };
276         
277         String[] args0 = {"./data/temperature/","./out/temperature/"};
278         
279 //        args0是输入路径和输出路径的属组
280         
281         //第二步
282         int ec = ToolRunner.run(new Configuration(), new Temperature(), args0);
283         
284 //        ToolRunner.run(new Configuration(), new Temperature(), args0)有这么一个构造方法
285         
286         //第一个参数是读取配置文件,第二个参数是主类Temperature,第三个参数是输入路径和输出路径的属组
287         System.exit(ec);
288     }
289 
290 }
291     

 

 

 

 

 

 

 

 TemperatureMapperTest.java

 1 package zhouls.bigdata.myMapReduce.TemperatureTest;
 2 
 3 import java.io.IOException;
 4 
 5 
 6 import org.apache.hadoop.io.IntWritable;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.Text;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 import org.apache.hadoop.mrunit.mapreduce.MapDriver;
11 import org.junit.Before;
12 import org.junit.Test;
13 
14 /**
15  * Mapper 端的单元测试,这里用MRUnit框架,需要使用mrunit-hadoop.jar
16  */
17 @SuppressWarnings("all")//告诉编译器忽略指定的警告,不用在编译完成后出现警告信息
18 public class TemperatureMapperTest{
19     private Mapper mapper;//定义一个Mapper对象,是mapper
20     private MapDriver driver;//定义一个MapDriver对象,是driver,因为是要MapDriver去做!
21     
22     @Before//@Before是在所拦截单元测试方法执行之前执行一段逻辑,读艾特Before
23     public void init(){//初始化方法init
24         mapper = new Temperature.TemperatureMapper();//实例化一个Temperature中的TemperatureMapper对象
25         driver = new MapDriver(mapper);//实例化MapDriver对象
26     }
27     
28     
29     @Test//@Test是测试方法提示符,一般与@Before组合使用
30     public void test() throws IOException{
31     //因为测试的是Map
32         //输入一行测试数据
33         String line = "1985 07 31 02   200    94 10137   220    26     1     0 -9999";
34         driver.withInput(new LongWritable(), new Text(line))//withInput方法是第一行输入       跟TemperatureMapper输入类型一致
35               .withOutput(new Text("03103"), new IntWritable(200))//withOutput方法是输出    跟TemperatureMapper输出类型一致
36               .runTest();//runTest方法是调用运行方法
37     }
38 }

 

 

 

 

 

 

 

 

 

 

 

 

 

 TemperatureReduceTest.java代码

 1 package zhouls.bigdata.myMapReduce.TemperatureTest;
 2 
 3 import java.io.IOException;
 4 
 5 import java.util.ArrayList;
 6 import java.util.List;
 7 import org.apache.hadoop.io.IntWritable;
 8 import org.apache.hadoop.io.Text;
 9 import org.apache.hadoop.mapreduce.Reducer;
10 import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
11 import org.junit.Before;
12 import org.junit.Test;
13 
14 /**
15  * Reducer 单元测试,这里用MRUnit框架,需要使用mrunit-hadoop.jar
16  */
17 @SuppressWarnings("all")//告诉编译器忽略指定的警告,不用在编译完成后出现警告信息
18 public class TemperatureReduceTest{
19     private Reducer reducer;//定义一个Reducer对象

以上是关于Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop MapReduce编程 API入门系列之wordcount版本5

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2

Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

Hadoop MapReduce编程 API入门系列之mr编程快捷键活用技巧详解

Hadoop MapReduce编程 API入门系列之join(二十五)(未完)

Hadoop MapReduce编程 API入门系列之统计学生成绩版本1(十七)