环境大数据MapReduce

Posted 一 研 为定

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了环境大数据MapReduce相关的知识,希望对你有一定的参考价值。

目录

一、题目要求

第一题:月平均气温统计

 第二题:每日空气质量统计

第三题:各空气质量分类天数统计

二、问题思路

(一)、月平均气温统计

1、思路

2、代码

(二)、每日空气质量统计

1、思路

2、代码

(三)、各空气质量分类天数统计

1、思路

2、代码


一、题目要求

原数据如下:

第一题:月平均气温统计

得到示例结果如下:

 第二题:每日空气质量统计

示例结果如下:

第三题:各空气质量分类天数统计

结果示例如下:

下面我将对这三个问题进行详细解答。

二、问题思路

(一)、月平均气温统计

1、思路

        这道题的解题思路是,要求每月的平均气温,但数据给的是每天的气温,所以需要将每天的数据分片,将每日编号(20160101)切片成每月编号(201601),这样再进行对每月的气温取平均值就可以了。

        写这道题之前可以先把mapper和reducer里的k,v都想好了,然后再写。mapper里是k1,v1,k2,v2,reducer里是k3,v3,k4,v4。如下:

k1  0行,1行     偏移量                                              LongWritable

v1  每一行的数据                                                       Text 

k2  201601                                                                 Text

v2  201601   对应的所有气温  -2,-3,。。。           IntWritable

k3  201601                                                                 Text

v3 {-2,-3,-3。。。}                                                IntWritable

k4 201601,201602。。。                                       Text

v4 avg(每月气温平均值)                                       FloatWritable

avg = total/count

2、代码

配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>ST1-weather</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>
    </dependencies>


</project>

mapper:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MavgtmpMapper extends Mapper<LongWritable, Text,Text,IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //忽略首行
        //用key判断,第一行的key一定是0
        if (key.get()==0) {
            return;
        }
        //字符串类型转成java类型
        String data = value.toString();
        //切片成数组,一共有9个元素
        String[] msgs = data.split(",");
        //切片,将20160101(精确到日期),变成201601(精确到月)
        if (!(msgs[5].equals("N/A"))){
            String s1 = msgs[0].substring(0,6);
            context.write(new Text(s1),new IntWritable(Integer.parseInt(msgs[5])));
        }
    }
}

reducer

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MavgtmpReducer extends Reducer<Text, IntWritable,Text, FloatWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //把处理的每一个201601的value值都加起来
        int total = 0;
        int count = 0;
        for (IntWritable v:values){
            //把IntWritable类型转化为int型
            count++;
            total += v.get();
        }
        float avg = total/count;
        //total由int类型变为FloatWritable类型
        context.write(key,new FloatWritable(avg));
    }
}

main

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MavgtmpMain {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());
        //程序主类
        job.setJarByClass(MavgtmpMain.class);
        //Mapper类的相关设置
        job.setMapperClass(MavgtmpMapper.class);
        //Map输出key,value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //Reducer类的相关设置
        job.setReducerClass(MavgtmpReducer.class);
        //程序运行输出key,value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);
        //设置输入,输出路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //提交任务,并等待任务运行完成
        job.waitForCompletion(true);
    }
}

(二)、每日空气质量统计

1、思路

求每日空气质量统计,由题目可以确定k4是每日编号,v4是每日空气质量AQI,由于每日每时有多个AQI,所以应取平均值。由此可以推出k3是每日编号,v3是每日AQI的集合,那末k2也能确定是每日编号,v2是每日编号对应的AQI。

k1  0行,1行                                                       LongWritable

v1  每一行的数据                                                 Text 

k2  20160101                                                      Text

v2  20160101对应的AQI值,即msg[6],212      IntWritable

k3  20160101                                                      Text

v3 {212,209,204,194。。。}                         IntWritable

k4 20160101,20160102,20160103.。。          Text

v4 avg(每天AQI平均值)                                    IntWritable

avg = total/count

2、代码

mapper

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MavgaqiMapper extends Mapper<LongWritable, Text,Text,IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //忽略首行
        //用key判断,第一行的key一定是0
        if (key.get()==0) {
            return;
        }
        //字符串类型转成java类型
        String data = value.toString();
        //切片成数组,一共有9个元素
        String[] msgs = data.split(",");

        if (!(msgs[6].equals("N/A"))){
            context.write(new Text(msgs[0]),new IntWritable(Integer.parseInt(msgs[6])));
        }
    }
}

reducer

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MavgaqiReducer extends Reducer<Text, IntWritable,Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //把处理的每一个201601的value值都加起来
        int total = 0;
        int count = 0;
        int avg = 0;
        for (IntWritable v:values){
            //把IntWritable类型转化为int型
            count++;
            total += v.get();
        }
        avg = total/count;
        //total由int类型变为FloatWritable类型
        context.write(key,new IntWritable(avg));
    }
}

main

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MavgaqiMain {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());
        //程序主类
        job.setJarByClass(MavgaqiMain.class);
        //Mapper类的相关设置
        job.setMapperClass(MavgaqiMapper.class);
        //Map输出key,value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //Reducer类的相关设置
        job.setReducerClass(MavgaqiReducer.class);
        //程序运行输出key,value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置输入,输出路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //提交任务,并等待任务运行完成
        job.waitForCompletion(true);
    }
}

(三)、各空气质量分类天数统计

1、思路

问题是求解各空气质量分类天数,可知k4是空气质量,v4是天数,可以借助第二题求得的数据,第二题求的数据第一列是天数,第二列是该天的AQI,可以根据AQI的大小评估出该天的空气质量等级,如下图:,并将评估出的等级放入字符串s。即k2为空气质量等级,v2赋值为1;

k1  问题二的 0、1行                                              LongWritable

v1  每一行的数据                                                   Text 

k2  优、良、轻度污染                                            Text

v2  1                                                                      IntWritable

k3  优、良                                                              Text

v3 {1,1,1...}                                                        IntWritable

k4 优,良                                                             Text

v4 count求和                                                      IntWritable

读第二个文件,的时候把text类型转发为integer类型,区分不同等级,分类写出来,把数据改成分为类的字符,

2、代码

mapper

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class McountaqiMapper extends Mapper<LongWritable, Text,Text,IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //忽略首行
        //用key判断,第一行的key一定是0
        //if (key.get()==0) {
           // return;
      //  }
        //字符串类型转成java类型
        String data = value.toString();
        //切片成数组
        String[] msgs = data.split("\\t");
       //索引下标为1的数值,并赋值给aqi
        
        int aqi = Integer.parseInt(msgs[1]);
        //s是字符串
        String s = "";
        if (aqi<=50){
            s = "优";
        }
        else if (aqi<=100){
            s = "良";
        }
        else if (aqi<=150){
            s = "轻度污染";
        }
        else if (aqi<=200){
            s = "中度污染";
        }
        else if (aqi<=300){
            s = "重度污染";
        }
        else {
            s = "严重污染";
        }
        context.write(new Text(s),new IntWritable(1));
    }
}

reducer

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class McountaqiReducer extends Reducer<Text, IntWritable,Text, IntWritable> {
    @Override
    protected void reduce(Text s, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //把处理的每一个201601的value值都加起来
        int daycount = 0;
        for (IntWritable v:values) {
            //把IntWritable类型转化为int型
            daycount += v.get();
        }
        context.write(s,new IntWritable(daycount));
    }
}

main

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class McountaqiMain {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());
        //程序主类
        job.setJarByClass(McountaqiMain.class);
        //Mapper类的相关设置
        job.setMapperClass(McountaqiMapper.class);
        //Map输出key,value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //Reducer类的相关设置
        job.setReducerClass(McountaqiReducer.class);
        //程序运行输出key,value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置输入,输出路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //提交任务,并等待任务运行完成
        job.waitForCompletion(true);
    }
}

 题都不难,好好思考就能解出来,加油!!!

以上是关于环境大数据MapReduce的主要内容,如果未能解决你的问题,请参考以下文章

环境大数据MapReduce

环境大数据MapReduce

hadoop大数据分析

大数据学习之七——MapReduce简单代码实例

大数据关键技术——MapReduce

大数据之Hadoop(MapReduce):MapReduce扩展案例