手写MapReduce程序详细操作步骤

Posted JeffchenITM

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手写MapReduce程序详细操作步骤相关的知识,希望对你有一定的参考价值。

目录

1.创建Maven工程

2.编写mr程序

3. 在window本地运行mr程序

4. 在Linux集群上面运行手写mr程序

目标:在Maven工程中自己手写Mr程序,并实现在windows本地运行,和在Linux集群上面运行。(包含所有细节部分,对新手十分友好)

1.创建Maven工程

1.1 在java项目下新建一个Maven工程:File---new---module,选择Maven,

点击Next。参考如下图

设置maven工程的GroupId和ArtifactId,这里可以选择自己创建名字,参考图如下

1.2  在pom.xml文件中添加如下依赖

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
</dependencies>

1.3 在项目的src/main/resources目录下,新建一个文件,命名为“log4j2.xml”,在文件中填入。

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
    <Appenders>
        <!-- 类型名为Console,名称为必须属性 -->
        <Appender type="Console" name="STDOUT">
            <!-- 布局为PatternLayout的方式,
            输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
            <Layout type="PatternLayout"
                    pattern="[%p] [%dyyyy-MM-dd HH:mm:ss][%c10]%m%n" />
        </Appender>

    </Appenders>

    <Loggers>
        <!-- 可加性为false -->
        <Logger name="test" level="info" additivity="false">
            <AppenderRef ref="STDOUT" />
        </Logger>

        <!-- root loggerConfig设置 -->
        <Root level="info">
            <AppenderRef ref="STDOUT" />
        </Root>
    </Loggers>

</Configuration>

2.编写mr程序

在MapReduce工程源码目录下创建三个类:

WordCountMapper  WordCountReducer  WordCountDriver

目录结构参考如下

 依次编写三个类的程序

2.1 WordCountMapper类

package com.atguigu.mr;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 以WordCount案例 为例:
 * 自定义的Mapper类 需要继承Hadoop提供的Mapper 并且根据具体业务指定输入数据和输出数据的数据类型
 *
 * 输入数据的类型
 * KEYIN,  读取文件的偏移量  数字(LongWritable)
 * VALUEIN, 读取文件的一行数据  文本(Text)
 *
 * 输出数据的类型
 * KEYOUT,  输出数据的key的类型 就是一个单词(Text)
 * VALUEOUT 输出数据value的类型 给单词的标记 1 数字(IntWritable)
 *
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> 

    @Override
    protected void setup(Context context) throws IOException, InterruptedException 
        System.out.println("setup方法执行了");
    

    private Text outk = new Text();
    private IntWritable outv = new IntWritable(1);


    /**
     * Map阶段的核心业务处理方法,每输入一行数据会调用一次map方法
     * @param key 输入数据的key
     * @param value 输入数据的value
     * @param context 上下文对象
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        // 获取当前输入的数据
        String line = value.toString();
        // 切割数据
        String[] datas = line.split(" ");
        // 遍历集合 封装 输出数据的key和value
        for (String data : datas) 
            outk.set(data);
            context.write(outk, outv);
        
        System.out.println("map方法执行了");
    


    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException 
        System.out.println("cleanup执行了");
    
2.2 WordCountReducer类
package com.atguigu.mr;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * 以WordCount案例 为例:
 * 自定义的Reducer类 需要继承Hadoop提供的Reducer 并且根据具体业务指定输入数据和输出数据的数据类型
 *
 * 输入数据的类型
 * KEYIN,  Map端输出的key的数据类型
 * VALUEIN, Map端输出的value的数据类型
 *
 * 输出数据的类型
 * KEYOUT,  输出数据的key的类型 就是一个单词(Text)
 * VALUEOUT 输出数据value的类型 单词出现的总次数(IntWritable)
 */
public class WordCountReducer extends Reducer<Text, IntWritable,Text, IntWritable> 

    private Text outk = new Text();
    private IntWritable outv = new IntWritable();


    /**
     *  Reduce阶段的核心业务处理方法, 一组相同key的values会调用一次reduce方法
     * @param key
     * @param values
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
        int total = 0;
        // 遍历values
        for (IntWritable value : values) 
            // 对value累加进行累加 输出结果
            total+=value.get();
        
        // 封装key和value
        outk.set(key);
        outv.set(total);
        context.write(outk, outv);
    

2.3 WordCountDriver类

package com.atguigu.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * MR程序的驱动类:主要用于提交MR任务
 */
public class WordCountDriver 

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
        // 声明配置对象
        Configuration conf = new Configuration();
        // 声明Job对象
        Job job = Job.getInstance(conf);
        // 指定当前Job的驱动类
        job.setJarByClass(WordCountDriver.class);
        // 指定当前Job的 Mapper和Reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 指定Map段输出数据的key的类型和输出数据value的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 指定最终输出结果的key的类型和value的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 指定输入数据的目录 和 输出数据的目录(windows本地运行)
        FileInputFormat.setInputPaths(job, new Path("E:\\\\wcinput\\\\"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\\\wcoutput1"));

      //这两行注释是在linux集群上面运行时使用,将上面两行注释掉
      //  FileInputFormat.setInputPaths(job, new Path(args[0]));  
      // FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 提交Job
        job.waitForCompletion(true);
    

3. 在window本地运行mr程序

3.1 准备测试文件

要准备的测试文件 hello.txt 内容如下,复制内容到你自己的电脑上面,注意更改WordCountDriver程序中文件输入路径。

atguigu atguigu
ss ss
cls cls
jiao
banzhang
xue
hadoop
sgg sgg sgg
nihao nihao
bigdata0111
laiba

注意:输入路径不需要指定到hello.txt文件,指定到hello.txt所在的文件夹。输出结果文件路径最后一层在电脑中不需要提前建好文件夹,在运行是会自动建文件夹,在指定路径下不能有与输出文件路径重名的文件夹,每次运行都要重新换一个新文件夹名。

3.2 程序编写完成后,运行WordCountDriver程序的main方法,成功输出结果文件。参考图如下

 打开part-00000文件,信息如下

 到此为止,在windows本地运行结束

4. 在Linux集群上面运行手写mr程序

在Linux上面启动hdfs集群,在命令行输入jps显示如下:

打开web端(hadoop102:9870和hadoop103:8088),在跟目录下面新建文件夹wcinput如下图所示

 

点击wcinput进入该文件夹中,点击上传图标,将本地hello.txt文件传入该文件夹。如下图所示

 回到maven工程WordCountDriver.java中,将原来输入数据目录和输出数据目录注释掉,将下面两行程序解开。如下图所示。

 这里解释一下为什么输入输出目录变成args[0]和args[1]了:在主程序入口,设置有接受参数,待会再linux环境下运行时,jar包后面跟的第一个参数就是输入数据路径,第二个参数是输出数据目录。

做完这之后,打开MapReduce的生命周期目录,点击package完成对程序的打包。如下图所示

 我们可以在MapReduce工程target目录下看见我们打完包的程序,参考图如下

 然后打开shell窗口,再Hadoop102服务器hadoop-3.1.3目录下,报这个jar包直接拉过来(鼠标左键点击jar包拖到该窗口)。然后在该目录下面输入ll查看jar包是否在当前目录下。如下图所示

跑MR程序之前先确定一下集群是否正常开启,输入jps命令,如下图所示则正常开启

然后输入下面命令开始执行Mr程序

hadoop jar MapReduce-1.0-SNAPSHOT.jar com.atguigu.mr.WordCountDriver /wcinput  /wcoutput

针对上面这行指令,有如下补充①②③条解释:

 ①:是要执行jar包里的程序路径参照下图步骤复制贴过去:

②处的指令会被带入maven程序中arg[0]

③处的指令会被带入maven程序中arg[1]

到这里,linux上面的mr程序就已经开始跑起来了,这时候我们可以切换到打开的web端(hadoop103:8088)查看程序执行情况。如下图所示

 等待程序跑完,我们去wcoutput文件夹下便可以查看程序结果

 点进去看结果文件,如下图所示

 

 至此,在linux上运行手写mr程序完成

以上是关于手写MapReduce程序详细操作步骤的主要内容,如果未能解决你的问题,请参考以下文章

mapreduce运行流程总结

Mybatis反向工程详细步骤

怎样找到mysql的用户名和密码

EXCEL提示找不到应用程序

怎样用易语言打开exe文件

devc++ 如何看输出结果