2021年大数据Hadoop(十七):MapReduce编程规范及示例编写

Posted Lansonli

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2021年大数据Hadoop(十七):MapReduce编程规范及示例编写相关的知识,希望对你有一定的参考价值。

全网最详细的Hadoop文章系列,强烈建议收藏加关注!

后面更新文章都会列出历史文章目录,帮助大家回顾知识重点。

目录

本系列历史文章

前言

MapReduce编程规范及示例编写

编程规范

Map阶段2个步骤

Shuffle 阶段 4 个步骤

Reduce 阶段 2 个步骤

编程步骤

Mapper

Reducer

Driver

WordCount示例编写

第一步:数据准备

第二步:代码编写


本系列历史文章

2021年大数据Hadoop(十六):MapReduce计算模型介绍

2021年大数据Hadoop(十五):Hadoop的联邦机制 Federation

2021年大数据Hadoop(十四):HDFS的高可用机制

2021年大数据Hadoop(十三):HDFS意想不到的其他功能

2021年大数据Hadoop(十二):HDFS的API操作

2021年大数据Hadoop(十一):HDFS的元数据辅助管理

2021年大数据Hadoop(十):HDFS的数据读写流程

2021年大数据Hadoop(九):HDFS的高级使用命令

2021年大数据Hadoop(八):HDFS的Shell命令行使用

2021年大数据Hadoop(七):HDFS分布式文件系统简介

2021年大数据Hadoop(六):全网最详细的Hadoop集群搭建

2021年大数据Hadoop(五):Hadoop架构

2021年大数据Hadoop(四):Hadoop发行版公司

2021年大数据Hadoop(三):Hadoop国内外应用

2021年大数据Hadoop(二):Hadoop发展简史和特性优点

2021年大数据Hadoop(一):Hadoop介绍

 

前言

2021年全网最详细的大数据笔记,轻松带你从入门到精通,该栏目每天更新,汇总知识分享

MapReduce编程规范及示例编写

 

编程规范

MapReduce 的开发一共有八个步骤, 其中 Map 阶段分为2个步骤,Shuffle 阶段 4 个步骤,Reduce 阶段分为2个步骤

Map阶段2个步骤

1、设置 InputFormat 类, 读取输入文件内容,对输入文件的每一行,解析成key、value对(K1和V1)。

2、自定义map方法,每一个键值对调用一次map方法,将第一步的K1和V1结果转换成另外的 Key-Value(K2和V2)对, 输出结果。

 

Shuffle 阶段 4 个步骤

3、 对map阶段输出的k2和v2对进行分区

4、 对不同分区的数据按照相同的Key排序

5、(可选)对数据进行局部聚合, 降低数据的网络拷贝

6、对数据进行分组, 相同Key的Value放入一个集合中,得到K2和[V2]

Reduce 阶段 2 个步骤

7、对map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
8、对多个map任务的输出进行合并、排序。编写reduce方法,在此方法中将K2和[V2]进行处理,转换成新的key、value(K3和V3)输出,并把reduce的输出保存到文件中。

 

编程步骤

用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)

Mapper

  1. 自定义类继承Mapper类
  2. 重写自定义类中的map方法,在该方法中将K1和V1转为K2和V2
  3. 将生成的K2和V2写入上下文中

 

Reducer

  1. 自定义类继承Reducer类
  2. 重写Reducer中的reduce方法,在该方法中将K2和[V2]转为K3和V3
  3. 将K3和V3写入上下文中

 

Driver

整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象

1、定义类,编写main方法

2、在main方法中指定以下内容:

  1. 创建建一个job任务对象
  2. 指定job所在的jar包
  3. 指定源文件的读取方式类和源文件的读取路径
  4. 指定自定义的Mapper类和K2、V2类型
  5. 指定自定义分区类(如果有的话)
  6. 指定自定义Combiner类(如果有的话)
  7. 指定自定义分组类(如果有的话)
  8. 指定自定义的Reducer类和K3、V3的数据类型
  9. 指定输出方式类和结果输出路径
  10. 将job提交到yarn集群


WordCount示例编写

需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数

第一步:数据准备

1、创建一个新的文件

cd /export/server

vim wordcount.txt

2、向其中放入以下内容并保存

hello,world,hadoop

hive,sqoop,flume,hello

kitty,tom,jerry,world

hadoop

3、上传到 HDFS

hadoop fs   -mkdir -p  /input/wordcount

hadoop fs -put wordcount.txt /input/wordcoun

第二步:代码编写

1、导入maven坐标

<dependencies>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.7.5</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-client</artifactId>

            <version>2.7.5</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.7.5</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-mapreduce-client-core</artifactId>

            <version>2.7.5</version>

        </dependency>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>4.12</version>

        </dependency>

        <dependency>

            <groupId>org.slf4j</groupId>

            <artifactId>slf4j-simple</artifactId>

            <version>1.7.25</version>

        </dependency>

    </dependencies>

2、定义一个mapper类

//首先要定义四个泛型的类型

//keyin:  LongWritable    valuein: Text

//keyout: Text            valueout:IntWritable

public class WordCountMapper extends Mapper<LongWritable, Text, Text, Writable>{

//map方法的生命周期:  框架每传一行数据就被调用一次

//key :  这一行的起始点在文件中的偏移量

//value: 这一行的内容

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    //拿到一行数据转换为string

    String line = value.toString();

    //将这一行切分出各个单词

    String[] words = line.split(" ");

    //遍历数组,输出<单词,1>

    for(String word:words){

        context.write(new Text(word), new LongWritable (1));

    }

}

}

3、定义一个reducer类

public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {

//生命周期:框架每传递进来一个kv 组,reduce方法被调用一次

 @Override

 protected void reduce(Text key, Iterable<LongWritable > values, Context context) throws IOException, InterruptedException {

//定义一个计数器

int count = 0;

//遍历这一组kv的所有v,累加到count中

for(LongWritable value:values){

count += value.get();

}

context.write(key, new LongWritable (count));

 }

}

4、定义一个Driver主类,用来描述job并提交job


public class WordCountRunner {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //1、创建建一个job任务对象

        Configuration configuration = new Configuration();

        Job job = Job.getInstance(configuration, "wordcount");



        //2、指定job所在的jar包

        job.setJarByClass(WordCountRunner.class);



        //3、指定源文件的读取方式类和源文件的读取路径

        job.setInputFormatClass(TextInputFormat.class); //按照行读取

        //TextInputFormat.addInputPath(job, new Path("hdfs://node1:8020/input/wordcount")); //只需要指定源文件所在的目录即可

         TextInputFormat.addInputPath(job, new Path("file:///E:\\\\input\\\\wordcount")); //只需要指定源文件所在的目录即可



        //4、指定自定义的Mapper类和K2、V2类型

        job.setMapperClass(WordCountMapper.class); //指定Mapper类

        job.setMapOutputKeyClass(Text.class); //K2类型

        job.setMapOutputValueClass(LongWritable.class);//V2类型



        //5、指定自定义分区类(如果有的话)

        //6、指定自定义分组类(如果有的话)

        //7、指定自定义的Reducer类和K3、V3的数据类型

        job.setReducerClass(WordCountReducer.class); //指定Reducer类

        job.setOutputKeyClass(Text.class); //K3类型

        job.setOutputValueClass(LongWritable.class);  //V3类型



        //8、指定输出方式类和结果输出路径

        job.setOutputFormatClass(TextOutputFormat.class);

        //TextOutputFormat.setOutputPath(job, new  Path("hdfs://node1:8020/output/wordcount")); //目标目录不能存在,否则报错

        TextOutputFormat.setOutputPath(job, new  Path("file:///E:\\\\output\\\\wordcount")); //目标目录不能存在,否则报错



        //9、将job提交到yarn集群

        boolean bl = job.waitForCompletion(true); //true表示可以看到任务的执行进度



        //10.退出执行进程

        System.exit(bl?0:1);

    }

}

 

以上是关于2021年大数据Hadoop(十七):MapReduce编程规范及示例编写的主要内容,如果未能解决你的问题,请参考以下文章

2021年大数据环境搭建:分布式环境搭建

2021年大数据HBase(十七):HBase的360度全面调优

2021年大数据Flink(三十七):​​​​​​​Table与SQL ​​​​​​案例四

2021年大数据Flink(四十七):扩展阅读  File Sink

2021年大数据ELK(二十七):数据可视化(Visualize)

2021年大数据常用语言Scala(二十七):函数式编程 聚合操作