剖析美国平均气温项目,掌握MapReduce编程

Posted zhoupp

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了剖析美国平均气温项目,掌握MapReduce编程相关的知识,希望对你有一定的参考价值。

数据集导入HDFS

技术图片

通过命令行访问刚刚上传至HDFS的数据集

[[email protected] hadoop-2.6.0]$ bin/hdfs dfs -ls /weather/

  技术图片

MapReduce程序编译及运行:

第一步:在 Map 阶段,提取气象站和气温数据

public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> {
		/**
		 * @function Mapper 解析气象站数据
		 * @input key=偏移量  value=气象站数据
		 * @output key=weatherStationId value=temperature
		 */
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			
			String line = value.toString(); //读取每行数据
			int temperature = Integer.parseInt(line.substring(14, 19).trim());//气温

			if (temperature != -9999) { //过滤无效数据	
				FileSplit fileSplit = (FileSplit) context.getInputSplit();
				String weatherStationId = fileSplit.getPath().getName().substring(5, 10);//通过文件名称提取气象站id
				context.write(new Text(weatherStationId), new IntWritable(temperature));
			}
		}
	}

第二步:在 Reduce 阶段,统计每个气象站的平均气温

/**
	 * 
	 * @function Reducer 统计美国各个州的平均气温
	 * @input key=weatherStationId  value=temperature
	 * @output key=weatherStationId value=average(temperature)
	 */
	public static class TemperatureReducer extends Reducer< Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable< IntWritable> values,Context context) throws IOException, InterruptedException {
			int sum = 0;
			int count = 0;
			for (IntWritable val : values) {
				sum += val.get();
				count++;
			}
			result.set(sum / count);
			context.write(key, result);
		}
	}

第三步:对代码进行单元测试及debug调试。

Mapper单元测试

Mapper 的逻辑就是从读取的气象站数据中,提取气温值。比如读取一行"1985 07 31 02   200    94 10137   220    26     1     0 -9999"气象数据,提取第14位到19位之间的字符即为气温值200。

/**
 * Mapper 端的单元测试
 */
@SuppressWarnings("all")
public class TemperatureMapperTest {
	private Mapper mapper;//定义一个Mapper对象
	private MapDriver driver;//定义一个MapDriver 对象

	@Before
	public void init() {
		mapper = new Temperature.TemperatureMapper();//实例化一个Temperature中的TemperatureMapper对象
		driver = new MapDriver(mapper);//实例化MapDriver对象
	}

	@Test
	public void test() throws IOException {
		//输入一行测试数据
		String line = "1985 07 31 02   200    94 10137   220    26     1     0 -9999";
		driver.withInput(new LongWritable(), new Text(line))//跟TemperatureMapper输入类型一致
				.withOutput(new Text("weatherStationId"), new IntWritable(200))//跟TemperatureMapper输出类型一致
				.runTest();
	}
}

Reduce单元测试       

Reduce 函数的逻辑就是把key相同的 value 值相加然后取平均值,Reducer 单元测试

/**
 * Reducer 单元测试
 */
@SuppressWarnings("all")
public class TemperatureReduceTest {
	private Reducer reducer;//定义一个Reducer对象	
	private ReduceDriver driver;//定义一个ReduceDriver对象

	@Before
	public void init() {
		reducer = new Temperature.TemperatureReducer();//实例化一个Temperature中的TemperatureReducer对象
		driver = new ReduceDriver(reducer);//实例化ReduceDriver对象
	}

	@Test
	public void test() throws IOException {
		String key = "weatherStationId";//声明一个key值
		List values = new ArrayList();
		values.add(new IntWritable(200));//添加第一个value值
		values.add(new IntWritable(100));//添加第二个value值
		driver.withInput(new Text("weatherStationId"), values)//跟TemperatureReducer输入类型一致
			  .withOutput(new Text("weatherStationId"), new IntWritable(150))//跟TemperatureReducer输出类型一致
			  .runTest();
	}
}

Mapper 和 Reducer 集成起来测试

/**
 * Mapper 和 Reducer 集成起来测试
 */
@SuppressWarnings("all")
public class TemperatureTest {
	private Mapper mapper;//定义一个Mapper对象
	private Reducer reducer;//定义一个Reducer对象	
	private MapReduceDriver driver;//定义一个MapReduceDriver 对象

	@Before
	public void init() {
		mapper = new Temperature.TemperatureMapper();//实例化一个Temperature中的TemperatureMapper对象
		reducer = new Temperature.TemperatureReducer();//实例化一个Temperature中的TemperatureReducer对象
		driver = new MapReduceDriver(mapper, reducer);//实例化MapReduceDriver对象
	}

	@Test
	public void test() throws RuntimeException, IOException {
		//输入两行行测试数据
		String line = "1985 07 31 02   200    94 10137   220    26     1     0 -9999";
		String line2 = "1985 07 31 11   100    56 -9999    50     5 -9999     0 -9999";
		driver.withInput(new LongWritable(), new Text(line))//跟TemperatureMapper输入类型一致
			  .withInput(new LongWritable(), new Text(line2))
			  .withOutput(new Text("weatherStationId"), new IntWritable(150))//跟TemperatureReducer输出类型一致
			  .runTest();
	}
}

第四步:将项目编译和打包为Tempearture.jar,使用客户端将 Tempearture.jar上传至hadoop的/home/hadoop/Temp目录下。

技术图片

第五步:使用cd /home/hadoop/Temp 切换到当前目录,通过hadoop jar Temperature.jar com.hadoop.base.Temperature /weather/ /weather/out/命令行执行任务。

第六步:任务的最终结果输出到 HDFS ,使用hadoop fs -cat /weather/out/part-r-00000命令查看结果。

以上是关于剖析美国平均气温项目,掌握MapReduce编程的主要内容,如果未能解决你的问题,请参考以下文章

如何用hadoop统计美国气象局的最高气温

114 11 个案例掌握 Python 数据可视化--美国气候研究

环境大数据MapReduce

环境大数据MapReduce

环境大数据MapReduce

天津东软实训第五天