Hadoop第7周练习—MapReduce进行数据查询和实现推简单荐系统(转)

Posted 世界那么大,我想去看看

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop第7周练习—MapReduce进行数据查询和实现推简单荐系统(转)相关的知识,希望对你有一定的参考价值。

1  运行环境说明

1.1 硬软件环境

1.2 机器网络环境

2  书面作业1:计算员工相关

2.1 书面作业1内容

2.2  实现过程

2.2.1   准备测试数据

2.2.2   问题1:求各个部门的总工资

2.2.3   问题2:求各个部门的人数和平均工资

2.2.4   问题3:求每个部门最早进入公司的员工姓名

2.2.5   问题4:求各个城市的员工的总工资

2.2.6   问题5:列出工资比上司高的员工姓名及其工资

2.2.7   问题6:列出工资比公司平均工资要高的员工姓名及其工资

2.2.8   问题7:列出名字以J开头的员工姓名及其所属部门名称

2.2.9   问题8:列出工资最高的头三名员工姓名及其工资

2.2.10问题9:将全体员工按照总收入(工资+提成)从高到低排列

2.2.11问题10:求任何两名员工信息传递所需要经过的中间节点数

3  书面作业2:MAPREDUCE实现推荐系统

3.1 书面作业2内容

3.2     程序代码

3.2.1   CountThread.java

3.2.2   Recommendation.java

3.3 准备数据

3.4 实现过程

3.4.1   问题分析

3.4.2   处理流程图

3.4.3   配置运行参数

3.4.4   运行作业

 

1 运行环境说明

1.1  硬软件环境

l  主机操作系统:Windows 64 bit,双核4线程,主频2.2G,6G内存

l  虚拟软件:VMware® Workstation 9.0.0 build-812388

l  虚拟机操作系统:CentOS 64位,单核,1G内存

l  JDK:1.7.0_55 64 bit

l  Hadoop:1.1.2

1.2  机器网络环境

集群包含三个节点:1个namenode、2个datanode,其中节点之间可以相互ping通。节点IP地址和主机名分布如下:

序号

IP地址

机器名

类型

用户名

运行进程

1

10.88.147.221

hadoop1

名称节点

hadoop

NN、SNN、JobTracer

2

10.88.147.222

hadoop2

数据节点

hadoop

DN、TaskTracer

3

10.88.147.223

hadoop3

数据节点

hadoop

DN、TaskTracer

所有节点均是CentOS6.5 64bit系统,防火墙均禁用,所有节点上均创建了一个hadoop用户,用户主目录是/usr/hadoop。所有节点上均创建了一个目录/usr/local/hadoop,并且拥有者是hadoop用户。

2 书面作业1:计算员工相关

2.1  书面作业1内容

(本题10选2)把作业素材demo.txt中的两个表数据用适当的方式导入hadoop(来自Oracle数据库的样板表,可考虑分成2个文件存放,注意空值的处理)

书写Map-Reduce程序,求出以下结果

1) 求各个部门的总工资

2) 求各个部门的人数和平均工资

3) 求每个部门最早进入公司的员工姓名

4) 求各个城市的员工的总工资

5) 列出工资比上司高的员工姓名及其工资

6) 列出工资比公司平均工资要高的员工姓名及其工资

7) 列出名字以J开头的员工姓名及其所属部门名称

8) 列出工资最高的头三名员工姓名及其工资

9) 将全体员工按照总收入(工资+提成)从高到低排列,要求列出姓名及其总收入

10) 如果每位员工只能和他的直接上司,直接下属,同一部门的同事交流,求任何两名员工之间若要进行信息传递所需要经过的中间节点数。请评价一下这个问题是否适合使用map-reduce解决

2.2  实现过程

2.2.1准备测试数据

2.2.1.1拆分文件

把提供的测试数据第7-8周作业素材.txt按照要求拆分成两个文件dept(部门)和emp(员工),其中各字段用逗号分隔:

dept文件内容:

10,ACCOUNTING,NEW YORK

20,RESEARCH,DALLAS

30,SALES,CHICAGO

40,OPERATIONS,BOSTON

 

emp文件内容:

7369,SMITH,CLERK,7902,17-12月-80,800,,20

7499,ALLEN,SALESMAN,7698,20-2月-81,1600,300,30

7521,WARD,SALESMAN,7698,22-2月-81,1250,500,30

7566,JONES,MANAGER,7839,02-4月-81,2975,,20

7654,MARTIN,SALESMAN,7698,28-9月-81,1250,1400,30

7698,BLAKE,MANAGER,7839,01-5月-81,2850,,30

7782,CLARK,MANAGER,7839,09-6月-81,2450,,10

7839,KING,PRESIDENT,,17-11月-81,5000,,10

7844,TURNER,SALESMAN,7698,08-9月-81,1500,0,30

7900,JAMES,CLERK,7698,03-12月-81,950,,30

7902,FORD,ANALYST,7566,03-12月-81,3000,,20

7934,MILLER,CLERK,7782,23-1月-82,1300,,10

2.2.1.2上传测试文件

使用SSH工具(参见第1、2周2.1.3.1Linux文件传输工具所描述)把dept和emp两个文件上传到本地目录/usr/local/hadoop-1.1.2/input中,然后使用eclipse的HDFS插件工具上传该文件到/usr/hadoop/in目录中,如下图所示:

 

2.2.2问题1:求各个部门的总工资

2.2.2.1问题分析

MapReduce中的join分为好几种,比如有最常见的 reduce side join、map side join和semi join 等。reduce join 在shuffle阶段要进行大量的数据传输,会造成大量的网络IO效率低下,而map side join 在处理多个小表关联大表时非常有用 。

Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:

(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。

(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。

在下面代码中,将会把数据量小的表(部门dept )缓存在内存中,在Mapper阶段对员工部门编号映射成部门名称,该名称作为key输出到Reduce中,在Reduce中计算按照部门计算各个部门的总工资。

2.2.2.2处理流程图

 

2.2.2.3编写代码

Q1SumDeptSalary.java代码:

import java.io.BufferedReader;

import java.io.FileReader;

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

 

publicclass Q1SumDeptSalary extends Configured implements Tool {

 

    publicstaticclass MapClass extends Mapper<LongWritable, Text, Text, Text> {

 

        // 用于缓存 dept文件中的数据

        private Map<String, String> deptMap = new HashMap<String, String>();

        private String[] kv;

 

        // 此方法会在Map方法执行之前执行且执行一次

        @Override

        protectedvoid setup(Context context) throws IOException, InterruptedException {

            BufferedReader in = null;

            try {

 

                // 从当前作业中获取要缓存的文件

                Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());

                String deptIdName = null;

                for (Path path : paths) {

 

                    // 对部门文件字段进行拆分并缓存到deptMap中

                    if (path.toString().contains("dept")) {

                        in = new BufferedReader(new FileReader(path.toString()));

                        while (null != (deptIdName = in.readLine())) {

                           

                            // 对部门文件字段进行拆分并缓存到deptMap中

                            // 其中Map中key为部门编号,value为所在部门名称

                            deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);

                        }

                    }

                }

            } catch (IOException e) {

                e.printStackTrace();

            } finally {

                try {

                    if (in != null) {

                        in.close();

                    }

                } catch (IOException e) {

                    e.printStackTrace();

                }

            }

        }

 

publicvoid map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 

            // 对员工文件字段进行拆分

            kv = value.toString().split(",");

 

            // map join: 在map阶段过滤掉不需要的数据,输出key为部门名称和value为员工工资

            if (deptMap.containsKey(kv[7])) {

                if (null != kv[5] && !"".equals(kv[5].toString())) {

                    context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));

                }

            }

        }

    }

 

    publicstaticclass Reduce extends Reducer<Text, Text, Text, LongWritable> {

 

publicvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

 

            // 对同一部门的员工工资进行求和

            long sumSalary = 0;

            for (Text val : values) {

                sumSalary += Long.parseLong(val.toString());

            }

 

            // 输出key为部门名称和value为该部门员工工资总和

            context.write(key, new LongWritable(sumSalary));

        }

    }

 

    @Override

    publicint run(String[] args) throws Exception {

 

        // 实例化作业对象,设置作业名称、Mapper和Reduce类

        Job job = new Job(getConf(), "Q1SumDeptSalary");

        job.setJobName("Q1SumDeptSalary");

        job.setJarByClass(Q1SumDeptSalary.class);

        job.setMapperClass(MapClass.class);

        job.setReducerClass(Reduce.class);

 

        // 设置输入格式类

        job.setInputFormatClass(TextInputFormat.class);

 

        // 设置输出格式

        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

 

        // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径

       String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();

       DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());

        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

 

        job.waitForCompletion(true);

        return job.isSuccessful() ? 0 : 1;

    }

 

    /**

     * 主方法,执行入口

     * @param args 输入参数

     */

    publicstaticvoid main(String[] args) throws Exception {

        int res = ToolRunner.run(new Configuration(), new Q1SumDeptSalary(), args);

        System.exit(res);

    }

}

 

2.2.2.4配置运行参数

新建一个Java应用运行程序,需要在Arguments页签填写Q1SumDeptSalary运行的部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

l  部门数据路径:hdfs://hadoop1:9000/usr/hadoop/in/dept ,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率

l  员工数据路径:hdfs:// hadoop1:9000/usr/hadoop/in/emp

l  输出路径:hdfs:// hadoop1:9000/usr/hadoop/out/week7_q1

 

2.2.2.5运行并查看结果

设置运行参数完毕后,点击运行按钮:

 

运行成功后,刷新CentOS HDFS中的输出路径/usr/hadoop/out/week7_q1目录,打开part-r-00000文件,可以看到运行结果:

ACCOUNTING8750

RESEARCH6775

SALES  9400

 

2.2.3问题2:求各个部门的人数和平均工资

2.2.3.1问题分析

求各个部门的人数和平均工资,需要得到各部门工资总数和部门人数,通过两者相除获取各部门平均工资。首先和问题1类似在Mapper的Setup阶段缓存部门数据,然后在Mapper阶段抽取出部门编号和员工工资,利用缓存部门数据把部门编号对应为部门名称,接着在Shuffle阶段把传过来的数据处理为部门名称对应该部门所有员工工资的列表,最后在Reduce中按照部门归组,遍历部门所有员工,求出总数和员工数,输出部门名称和平均工资。

2.2.3.2处理流程图

 

2.2.3.3编写代码

Q2DeptNumberAveSalary.java代码:

import java.io.BufferedReader;

import java.io.FileReader;

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

 

publicclass Q2DeptNumberAveSalary extends Configured implements Tool {

 

    publicstaticclass MapClass extends Mapper<LongWritable, Text, Text, Text> {

 

        // 用于缓存 dept文件中的数据

        private Map<String, String> deptMap = new HashMap<String, String>();

        private String[] kv;

 

        // 此方法会在Map方法执行之前执行且执行一次

        @Override

        protectedvoid setup(Context context) throws IOException, InterruptedException {

            BufferedReader in = null;

            try {

                // 从当前作业中获取要缓存的文件

                Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());

                String deptIdName = null;

                for (Path path : paths) {

 

                    // 对部门文件字段进行拆分并缓存到deptMap中

                    if (path.toString().contains("dept")) {

                        in = new BufferedReader(new FileReader(path.toString()));

                        while (null != (deptIdName = in.readLine())) {

                           

                            // 对部门文件字段进行拆分并缓存到deptMap中

                            // 其中Map中key为部门编号,value为所在部门名称

                            deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);

                        }

                    }

                }

            } catch (IOException e) {

                e.printStackTrace();

            } finally {

                try {

                    if (in != null) {

                        in.close();

                    }

                } catch (IOException e) {

                    e.printStackTrace();

                }

            }

        }

 

        publicvoid map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 

            // 对员工文件字段进行拆分

            kv = value.toString().split(",");

 

            // map join: 在map阶段过滤掉不需要的数据,输出key为部门名称和value为员工工资

            if (deptMap.containsKey(kv[7])) {

                if (null != kv[5] && !"".equals(kv[5].toString())) {

                    context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));

                }

            }

        }

    }

 

    publicstaticclass Reduce extends Reducer<Text, Text, Text, Text> {

 

        publicvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

 

            long sumSalary = 0;

            int deptNumber = 0;

 

            // 对同一部门的员工工资进行求和

            for (Text val : values) {

                sumSalary += Long.parseLong(val.toString());

                deptNumber++;

            }

 

            // 输出key为部门名称和value为该部门员工工资平均值

            context.write(key, new Text("Dept Number:" + deptNumber + ", Ave Salary:" + sumSalary / deptNumber));

        }

    }

 

    @Override

    publicint run(String[] args) throws Exception {

 

        // 实例化作业对象,设置作业名称、Mapper和Reduce类

        Job job = new Job(getConf(), "Q2DeptNumberAveSalary");

        job.setJobName("Q2DeptNumberAveSalary");

        job.setJarByClass(Q2DeptNumberAveSalary.class);

        job.setMapperClass(MapClass.class);

        job.setReducerClass(Reduce.class);

 

        // 设置输入格式类

        job.setInputFormatClass(TextInputFormat.class);

 

        // 设置输出格式类

        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

 

        // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径

        String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();

        DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());

        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

 

        job.waitForCompletion(true);

        return job.isSuccessful() ? 0 : 1;

    }

 

    /**

     * 主方法,执行入口

     * @param args 输入参数

     */

    publicstaticvoid main(String[] args) throws Exception {

        int res = ToolRunner.run(new Configuration(), new Q2DeptNumberAveSalary(), args);

        System.exit(res);

    }

}

2.2.3.4配置运行参数

新建一个Java应用运行程序,需要在Arguments页签填写Q2DeptNumberAveSalary运行的部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

l  部门数据路径:hdfs://hadoop1:9000/usr/hadoop/in/dept ,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率

l  员工数据路径:hdfs:// hadoop1:9000/usr/hadoop/in/emp

l  输出路径:hdfs:// hadoop1:9000/usr/hadoop/out/week7_q2

 

2.2.3.5运行并查看结果

设置运行参数完毕后,点击运行按钮:

 

运行成功后,刷新CentOS HDFS中的输出路径/usr/hadoop/out/week7_q2目录,打开part-r-00000文件,可以看到运行结果:

ACCOUNTINGDept Number:3,Ave Salary:2916

RESEARCHDept Number:3,Ave Salary:2258

SALES  Dept Number:6,Ave Salary:1566

 

2.2.4问题3:求每个部门最早进入公司的员工姓名

2.2.4.1问题分析

求每个部门最早进入公司员工姓名,需要得到各部门所有员工的进入公司日期,通过比较获取最早进入公司员工姓名。首先和问题1类似在Mapper的Setup阶段缓存部门数据,然后Mapper阶段抽取出key为部门名称(利用缓存部门数据把部门编号对应为部门名称),value为员工姓名和进入公司日期,接着在Shuffle阶段把传过来的数据处理为部门名称对应该部门所有员工+进入公司日期的列表,最后在Reduce中按照部门归组,遍历部门所有员工,找出最早进入公司的员工并输出。

2.2.4.2处理流程图

 

2.2.4.3编写代码

import java.io.BufferedReader;

import java.io.FileReader;

import java.io.IOException;

import java.text.DateFormat;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.HashMap;

import java.util.Map;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

 

publicclass Q3DeptEarliestEmp extends Configured implements Tool {

 

    publicstaticclass MapClass extends Mapper<LongWritable, Text, Text, Text> {

 

        // 用于缓存 dept文件中的数据

        private Map<String, String> deptMap = new HashMap<String, String>();

        private String[] kv;

 

        // 此方法会在Map方法执行之前执行且执行一次

        @Override

        protectedvoid setup(Context context) throws IOException, InterruptedException {

            BufferedReader in = null;

            try {

                // 从当前作业中获取要缓存的文件

                Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());

                String deptIdName = null;

                for (Path path : paths) {

                    if (path.toString().contains("dept")) {

                        in = new BufferedReader(new FileReader(path.toString()));

                        while (null != (deptIdName = in.readLine())) {

 

                            // 对部门文件字段进行拆分并缓存到deptMap中

                            // 其中Map中key为部门编号,value为所在部门名称

                            deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);

                        }

                    }

                }

            } catch (IOException e) {

                e.printStackTrace();

            } finally {

                try {

                    if (in != null) {

                        in.close();

                    }

                } catch (IOException e) {

                    e.printStackTrace();

                }

            }

        }

 

        publicvoid map(LongWritable key, Text value, Context context) throws IOException,     InterruptedException {

 

            // 对员工文件字段进行拆分

            kv = value.toString().split(",");

 

            // map join: 在map阶段过滤掉不需要的数据

            // 输出key为部门名称和value为员工姓名+","+员工进入公司日期

            if (deptMap.containsKey(kv[7])) {

                if (null != kv[4] && !"".equals(kv[4].toString())) {

                    context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[1].trim()                    + "," + kv[4].trim()));

                }

            }

        }

    }

 

    publicstaticclass Reduce extends Reducer<Text, Text, Text, Text> {

 

        publicvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException,        InterruptedException {

 

            // 员工姓名和进入公司日期

            String empName = null;

            String empEnterDate = null;

 

            // 设置日期转换格式和最早进入公司的员工、日期

            DateFormat df = new SimpleDateFormat("dd-MM月-yy");

 

            Date earliestDate = new Date();

            String earliestEmp = null;

 

            // 遍历该部门下所有员工,得到最早进入公司的员工信息

            for (Text val : values) {

                empName = val.toString().split(",")[0];

                empEnterDate = val.toString().split(",")[1].toString().trim();

                try {

                    System.out.println(df.parse(empEnterDate));

                    if (df.parse(empEnterDate).compareTo(earliestDate) < 0) {

                        earliestDate = df.parse(empEnterDate);

                        earliestEmp = empName;

                    }

                } catch (ParseException e) {

                    e.printStackTrace();

                }

            }

 

            // 输出key为部门名称和value为该部门最早进入公司员工

            context.write(key, new Text("The earliest emp of dept:" + earliestEmp + ", Enter            date:" + newSimpleDateFormat("yyyy-MM-dd").format(earliestDate)));

        }

    }

 

    @Override

    publicint run(String[] args) throws Exception {

 

        // 实例化作业对象,设置作业名称

        Job job = new Job(getConf(), "Q3DeptEarliestEmp");

        job.setJobName("Q3DeptEarliestEmp");

 

        // 设置Mapper和Reduce类

        job.setJarByClass(Q3DeptEarliestEmp.class);

        job.setMapperClass(MapClass.class);

        job.setReducerClass(Reduce.class);

 

        // 设置输入格式类

        job.setInputFormatClass(TextInputFormat.class);

 

        // 设置输出格式类

        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

 

        // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第三个参数为输出路径

        String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();

        DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());

        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

 

        job.waitForCompletion(true);

        returnjob.isSuccessful() ? 0 : 1;

    }

 

    /**

     * 主方法,执行入口

     * @param args 输入参数

     */

    publicstaticvoid main(String[] args) <

以上是关于Hadoop第7周练习—MapReduce进行数据查询和实现推简单荐系统(转)的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop集群(第9期)_MapReduce初级案例

Hadoop集群(第9期)_MapReduce初级案例

[hadoop]怎么把两个mapreduce工程合起来

大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化(重中之重)+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客

成功运行第一个MapReduce任务

大数据之Hadoop(MapReduce): MapReduce框架原理