MapReduce应用实例

Posted wtq1993

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce应用实例相关的知识,希望对你有一定的参考价值。


1.各个部门的总工资如何进行问题分析的?
2.各个部门的总工资处理流程是怎样的?
3.个部门的人数和平均工资又是如何得到的?
4.个部门的人数和平均工资代码如何实现?


技术分享



案例所用包全部下载:链接: http://pan.baidu.com/s/1sjNyDIX 密码:
游客,如果您要查看本帖隐藏内容请回复

1、环境说明
部署节点操作系统为CentOS,防火墙和SElinux禁用,创建了一个shiyanlou用户并在系统根目录下创建/app目录,用于存放Hadoop等组件运行包。因为该目录用于安装hadoop等组件程序,用户对shiyanlou必须赋予rwx权限(一般做法是root用户在根目录下创建/app目录,并修改该目录拥有者为shiyanlou(chown –R shiyanlou:shiyanlou /app)。
Hadoop搭建环境:
l  虚拟机操作系统: CentOS6.6  64位,单核,1G内存
l  JDK:1.7.0_55 64位
l  Hadoop:1.1.2
2、准备测试数据
测试数据包括两个文件dept(部门)和emp(员工),其中各字段用逗号分隔:
dept文件内容:
[Bash shell] 纯文本查看 复制代码
1
2
3
4
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON

emp文件内容:
[Bash shell] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
7369,SMITH,CLERK,7902,17-12月-80,800,,20
7499,ALLEN,SALESMAN,7698,20-2月-81,1600,300,30
7521,WARD,SALESMAN,7698,22-2月-81,1250,500,30
7566,JONES,MANAGER,7839,02-4月-81,2975,,20
7654,MARTIN,SALESMAN,7698,28-9月-81,1250,1400,30
7698,BLAKE,MANAGER,7839,01-5月-81,2850,,30
7782,CLARK,MANAGER,7839,09-6月-81,2450,,10
7839,KING,PRESIDENT,,17-11月-81,5000,,10
7844,TURNER,SALESMAN,7698,08-9月-81,1500,0,30
7900,JAMES,CLERK,7698,03-12月-81,950,,30
7902,FORD,ANALYST,7566,03-12月-81,3000,,20
7934,MILLER,CLERK,7782,23-1月-82,1300,,10

在/home/shiyanlou/install-pack/class6目录可以找到这两个文件,把这两个文件上传到HDFS中/class6/input目录中,执行如下命令:
[Bash shell] 纯文本查看 复制代码
1
2
3
4
5
cd /home/shiyanlou/install-pack/class6
hadoop fs -mkdir -p /class6/input
hadoop fs -copyFromLocal dept /class6/input
hadoop fs -copyFromLocal emp /class6/input
hadoop fs -ls /class6/input

技术分享
3、应用案例3.1 测试例子1:求各个部门的总工资3.1.1 问题分析
MapReduce中的join分为好几种,比如有最常见的 reduce side join、map side join和semi join 等。reduce join 在shuffle阶段要进行大量的数据传输,会造成大量的网络IO效率低下,而map side join 在处理多个小表关联大表时非常有用 。
Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
在下面代码中,将会把数据量小的表(部门dept)缓存在内存中,在Mapper阶段对员工部门编号映射成部门名称,该名称作为key输出到Reduce中,在Reduce中计算按照部门计算各个部门的总工资。
3.1.2 处理流程图
技术分享
3.1.3 测试代码
Q1SumDeptSalary.java代码(vi编辑代码是不能存在中文):

[Java] 纯文本查看 复制代码
001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class Q1SumDeptSalary extends Configured implements Tool {
 
    public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
 
        // 用于缓存 dept文件中的数据
        private Map<String, String> deptMap = new HashMap<String, String>();
        private String[] kv;
 
        // 此方法会在Map方法执行之前执行且执行一次
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader in = null;
            try {
 
                // 从当前作业中获取要缓存的文件
                Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
                String deptIdName = null;
                for (Path path : paths) {
 
                    // 对部门文件字段进行拆分并缓存到deptMap中
                    if (path.toString().contains("dept")) {
                        in = new BufferedReader(new FileReader(path.toString()));
                        while (null != (deptIdName = in.readLine())) {
                             
                            // 对部门文件字段进行拆分并缓存到deptMap中
                            // 其中Map中key为部门编号,value为所在部门名称
                            deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (in != null) {
                        in.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
 
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 
            // 对员工文件字段进行拆分
            kv = value.toString().split(",");
 
            // map join: 在map阶段过滤掉不需要的数据,输出key为部门名称和value为员工工资
            if (deptMap.containsKey(kv[7])) {
                if (null != kv[5] && !"".equals(kv[5].toString())) {
                    context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));
                }
            }
        }
    }
 
    public static class Reduce extends Reducer<Text, Text, Text, LongWritable> {
 
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 
            // 对同一部门的员工工资进行求和
            long sumSalary = 0;
            for (Text val : values) {
                sumSalary += Long.parseLong(val.toString());
            }
 
            // 输出key为部门名称和value为该部门员工工资总和
            context.write(key, new LongWritable(sumSalary));
        }
    }
 
    @Override
    public int run(String[] args) throws Exception {
 
        // 实例化作业对象,设置作业名称、Mapper和Reduce类
        Job job = new Job(getConf(), "Q1SumDeptSalary");
        job.setJobName("Q1SumDeptSalary");
        job.setJarByClass(Q1SumDeptSalary.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);
 
        // 设置输入格式类
        job.setInputFormatClass(TextInputFormat.class);
 
        // 设置输出格式
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
 
        // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径
    String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
    DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
 
        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }
 
    /**
     * 主方法,执行入口
     * @param args 输入参数
     */
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Q1SumDeptSalary(), args);
        System.exit(res);
    }
}


3.1.4 编译并打包代码
进入/app/hadoop-1.1.2/myclass/class6目录中新建Q1SumDeptSalary.java程序代码(代码页可以使用/home/shiyanlou/install-pack/class6/Q1SumDeptSalary.java文件)
cd /app/hadoop-1.1.2/myclass/class6
vi Q1SumDeptSalary.java
编译代码
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q1SumDeptSalary.java
把编译好的代码打成jar包(如果不打成jar形式运行会提示class无法找到的错误)
jar cvf ./Q1SumDeptSalary.jar ./Q1SumDept*.class
mv *.jar ../..
rm Q1SumDept*.class
技术分享
3.1.5 运行并查看结果
运行Q1SumDeptSalary时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:
l  部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率
l  员工数据路径:hdfs://hadoop:9000/class6/input/emp
l  输出路径:hdfs://hadoop:9000/class6/out1

运行如下命令:
cd /app/hadoop-1.1.2
hadoop jar Q1SumDeptSalary.jar Q1SumDeptSalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out1
技术分享
运行成功后,刷新CentOS HDFS中的输出路径/class6/out1目录,打开part-r-00000文件
hadoop fs -ls /class6/out1
hadoop fs -cat /class6/out1/part-r-00000
可以看到运行结果:
ACCOUNTING8750
RESEARCH6775
SALES  9400
技术分享
3.2 测试例子2:求各个部门的人数和平均工资3.2.1 问题分析
求各个部门的人数和平均工资,需要得到各部门工资总数和部门人数,通过两者相除获取各部门平均工资。首先和问题1类似在Mapper的Setup阶段缓存部门数据,然后在Mapper阶段抽取出部门编号和员工工资,利用缓存部门数据把部门编号对应为部门名称,接着在Shuffle阶段把传过来的数据处理为部门名称对应该部门所有员工工资的列表,最后在Reduce中按照部门归组,遍历部门所有员工,求出总数和员工数,输出部门名称和平均工资。
3.2.2 处理流程图
技术分享
3.2.3 编写代码
Q2DeptNumberAveSalary.java代码:

[Java] 纯文本查看 复制代码
001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class Q2DeptNumberAveSalary extends Configured implements Tool {
 
    public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
 
        // 用于缓存 dept文件中的数据
        private Map<String, String> deptMap = new HashMap<String, String>();
        private String[] kv;
 
        // 此方法会在Map方法执行之前执行且执行一次
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader in = null;
            try {
                // 从当前作业中获取要缓存的文件
                Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
                String deptIdName = null;
                for (Path path : paths) {
 
                    // 对部门文件字段进行拆分并缓存到deptMap中
                    if (path.toString().contains("dept")) {
        &n




















以上是关于MapReduce应用实例的主要内容,如果未能解决你的问题,请参考以下文章

片段事务中的实例化错误

MapReduce 应用实例

大数据学习之七——MapReduce简单代码实例

一文读懂MapReduce 附流量解析实例

片段 null 必须是公共静态类才能从实例状态正确重新创建

MapReduce编程实例5