hadoop离线day04--Hadoop MapReduce

Posted Vics异地我就

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop离线day04--Hadoop MapReduce相关的知识,希望对你有一定的参考价值。

目录

hadoop离线day04--Hadoop MapReduce

今日内容大纲

初始MapReduce

MapReduce思想

Hadoop MapReduce 设计构思

如何面对大数据场景

构建了函数式编程模型Map Reduce

统一构架,隐藏系统层细节

MapReduce框架和编程规范

什么叫做编程规范。

从代码层面看(静态的)

从运行层面看(动态的)

MapReduce入门案例--Wordcount单词统计

Reducer阶段

运行代码段

MapReduce程序执行方式

运行在什么模式如何决定呢?

MapReduce集群模式

MapReduce本地模式

MapReduce的执行流程梳理

MapReduce输入和输出


hadoop离线day04--Hadoop MapReduce


今日内容大纲

初始MapReduce
    MapReduce背后的思想  先分再合,分而治之
    MapReduce设计构思
    MapReduce编程规范 进程
    MapReduce入门案例--Wordcount
        统计单词次数 业务及其简单
        背后技术流程学会 掌握MapReduce80%
   MapReduce程序执行
       本地模式
       yarn集群模式
MapReduce基本流程
    input---->mr---->output  梳理输入和输出
    基本整体流程梳理


初始MapReduce

  • MapReduce思想

    • 核心:先分再合,分而治之

    • 使用场景:面向复杂的任务、庞大的任务

    • 步骤

      • 分的阶段(局部并行计算)--map

        把复杂的任务拆分成若干个小的任务。
        拆分的目的以并行方式处理小人物提高效率。
        #前提:任务可以拆分,拆分之后没有依赖关系。
        结果:每个任务处理完都是一个局部的结果。
         
        map侧重于映射(对应关系)  任务1-->结果1  任务2-->结果2
      • 汇总阶段(全局汇总计算)--reduce

        把上一个分的阶段局部结果进行全局汇总 得到最终结果。
        ​
        reduce指的是结果数量的减少 汇总。
  • Hadoop MapReduce 设计构思

    • 如何面对大数据场景

      #使用MapReduce思路来处理大数据。
      ​
      先把数据集拆分若干个小的数据集,前提是可以拆分并且拆分之后没有依赖。
      拆分之后可以并行计算提高计算。
      ​
      再通过全局汇总计算得出最终结果。
    • 构建了函数式编程模型Map Reduce

      #函数本质就是映射。
      f(x)=2x+1
          当x=1 f(1)=3
          当x=2 f(2)=5
      x-->f(x) 一一对应的映射关系。 
      ​
      #对应MapReduce来说 每个阶段都是输入数据经过处理对应着输出。
       MapReduce处理的数据类型是<key,value>键值对。
       实际使用中 考虑每个阶段输入输出 key value是什么。
       
      ​
    • 统一构架,隐藏系统层细节

      精准的把技术问题和业务问题区分。  技术是通用的 业务不通用的。
          hadoop实现了底层所有的技术问题。 --->80%代码   怎么做(how to do)
          用户实现业务问题               --->20%代码    做什么(what need to do)
          
      使用简单不代表技术简单 只能说MapReduce底层封装太牛逼。
  • MapReduce框架和编程规范

    • 什么叫做编程规范。

      因为最终MapReduce程序需要用户的代码和MapReduce自己实现的代码整合在一起 才能叫做完整MapReduce程序员。
      所以MapReduce需要一定的编程规范来要求用户如何实现自己的代码。
    • 从代码层面看(静态的)

      • 类1--->继承Mapper 负责map阶段业务逻辑处理

      • 类2--->继承Reducer 负责reduce阶段业务逻辑处理

      • 类3(main方法)--->客户端驱动类 负责设定 组装 拼接MapReduce程序 设置各种参数。

    • 从运行层面看(动态的)

      • maptask map阶段运行的task 处理map阶段的任务

      • reducetask reduce阶段运行的task 处理reduce阶段的任务

      • MrAppMaster 程序内部的老大 负责资源申请和task监督。

  • MapReduce入门案例--Wordcount单词统计

    • 背景

      网页倒排索引 统计关键字在页面中出现的次数。
    • 业务需求

      统计一个文件中每个单词出现的总次数。
    • mapper阶段

      mapper阶段的代码需要继承mapper类 提供了3个方法可以在子类中覆盖重写。

      /**
       * @description: MapReduce程序中mapper阶段运行的类型  对应着maptask
       * @author: Itcast
       *
       * KEYIN:  表示map阶段输入的kv中的k类型  在默认机制下 是每行起始位置偏移量  long
       * VALUEIN:表示map阶段输入的kv中的v类型  在默认机制下 是每行的内容    string
       *         todo  mapreduce默认读取数据组件  TextInputFormat类 :一行一行读取数据  按行读。
       *               key:每行起始位置偏移量
       *               value:这一行的内容
       *               <0,hello allen apple>
       *         todo long、string是java封装的类型 hadoop认为其类型在序列化的时候效率不高  比较臃肿
       *              hadoop实现了自己的序列化机制 Writable
       *              基于这个Writable实现了对应的数据类型
       *              long----->longWritable
       *              int------>intWritable
       *              String---->Text
       *              null------>nullWritable
       * KEYOUT:  表示map阶段输出的kv中的k类型   跟业务相关 本需求中 都是单词作为key  String text
       * VALUEOUT 表示map阶段输出的kv中的v类型   跟业务相关 本需求中  都是单词次数1  int
       */
      public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
          /**
           *  todo map方法是mapper阶段的核心方法  整个业务逻辑就在该方法中实现
           *       Q:map方法是如何调用的。
           *       TextInputFormat读取一行数据 返回一个kv对 调用一次map方法
           */
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              //第一行内容  <0,hello hive hello>
              //拿到一行内容进行切割
              String line = value.toString();
              String[] words = line.split("\\\\s+"); //[hello,hive,hello]
              //遍历数组
              for (String word : words) {
                  //输出数据 把每个单词标记1   <单词,1>
                  //使用框架提供的上下文对象 进行数据的输出
                  context.write(new Text(word),new IntWritable(1));//<hello,1> <hive,1> <hello,1>
              }
          }
      }

       

    • Reducer阶段

      /**
       * @description: MapReduce程序中reducer阶段运行的类型  对应着reducetask
       * @author: Itcast
       * todo reduce输入kv类型 就是map的输出的kv类型
       * KEYIN  对应着map的输出key 本需求中是单词  Text
       * VALUEIN 对应着map的输出value 本需求中是单词次数1  IntWritable
       *
       * KEYOUT reduce阶段最终的输出的key 也是mr程序的最终的输出  跟需求相关  本需求还是单词 Text
       * VALUEOUT reduce阶段最终的输出的value 也是mr程序的最终的输出  跟需求相关  本需求还是单词的总次数  IntWritable
       */
      public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
          /**
           * todo reduce方法就是reducer阶段核心业务逻辑处理的方法  该方法是如何调用的?  一组一调用
           * Q: 当所有的数据来到reducer之后 内部有什么行为?
           *    <hello,1><hadoop,1><hello,1><hello,1><hadoop,1>
           * todo 1、所有的数据排序  排序规则:key的字典序  a-z
           *    <hadoop,1><hadoop,1><hello,1><hello,1><hello,1>
           *
           * todo 2、分组grouping  分组规则:key相同的分为一组
           *      <hadoop,1><hadoop,1>
           *      <hello,1><hello,1><hello,1>
           * todo 3、每组构成一个新的kv对 去调用reduce方法
           *      新key: 该组共同的key
           *      新value: 该组所有的value组成的一个迭代器Iterable(理解为类似于集合数据结构)
           *
           *      <hadoop,1><hadoop,1>       ---->  <hadoop,Iterable[1,1]>
           *      <hello,1><hello,1><hello,1> --->  <hello,Iterable[1,1,1]>
           */
          @Override
          protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
              //定义个统计变量
              int count = 0;
              //遍历迭代器
              for (IntWritable value : values) {
                  //累加
                  count += value.get();
              }
              //使用上下文输出结果
              context.write(key,new IntWritable(count));
          }
      }

      运行代码段

    • /**
       * @description:  todo 使用Tool工具类进行驱动类封装和job的提交
       * @author: Itcast
       */
      public class WordCountDriver_v2 extends Configured implements Tool {
      
      
          public static void main(String[] args) throws Exception {
              //配置对象
              Configuration conf = new Configuration();
              //todo 使用工具类ToolRunner提交程序  ctrl+p 提示参数
              int status = ToolRunner.run(conf, new WordCountDriver_v2(), args);
              //退出
              System.exit(status);
          }
      
          @Override
          public int run(String[] args) throws Exception {
              //mr作业的核心类 Job 一个job就表示一次作业
              Job job = Job.getInstance(getConf(), "word_count");
      
              //设置本次mr作业运行的主类
              job.setJarByClass(WordCountDriver_v2.class);
      
              //设置本次mr作业的mapper类 reducer类
              job.setMapperClass(WordCountMapper.class);
              job.setReducerClass(WordCountReducer.class);
      
              //设置mapper阶段输出的key value类型  叫做程序中间的输出
              job.setMapOutputKeyClass(Text.class);
              job.setMapOutputValueClass(IntWritable.class);
      
              //设置Reducer阶段输出的key value类型  也就是程序最终的输出
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(IntWritable.class);
      
              //设置本次mr程序输入 输出的路径
              FileInputFormat.setInputPaths(job,new Path(args[0]));
              FileOutputFormat.setOutputPath(job,new Path(args[1]));
      
              //经过上述步骤执行 相当于将本次作业的job对象描述清楚了 接下来需要提交作业
      //        job.submit();
              //使用下面方式提交  指定参数为true 驱动类就会追踪打印job的执行的信息
              boolean result = job.waitForCompletion(true);
              // 最终程序退出  应该要和执行结果绑定
              // 如果result为true 表示job succeeded 退出状态码为0  否则就是1 表示程序执行异常
              return result ? 0:1;
          }
      }
      

       

  • MapReduce程序执行方式

    • MapReduce分为两种执行方式,本地模式local ,yarn集群模式。

    • 所谓的执行方式之间的区别就是MapReduce程序在执行的时候,谁给分配资源。

      • 本地模式:使用当前机器的本地环境,单机模拟分布式运行。由操作系统提供资源。

        适合开发测试 debug方便  单机的 没有分布式的魅力了
      • 集群模式:把mr程序打成jar提交给yarn集群 分配资源 进行分布式计算。

        适合生产环境线上环境   分布式程序
    • 运行在什么模式如何决定呢?

      • mapreduce.framework.name = local 本地模式

      • mapreduce.framework.name= yarn 集群模式

      • 如果不指定默认是什么模式 是local模式。

      • 此外还需要考虑执行环境中有没有配置 mapred-site.xml 会覆盖默认的。

    • MapReduce集群模式

      • step1: 代码中设置mapreduce.framework.name= yarn

      • step2:将程序打成jar包

      • step3:上传jar包到linux上(hadoop环境)

      • step4:使用命令提交yarn 保证yarn集群启动哦

        #mr程序提交
        hadoop jar xxx.jar [mainclass args1 args2] 
        yarn jar xxx.jar [mainclass args1 args2] 
        ​
        ​
        #栗子
        hadoop jar example-mr-1.0.jar /wordcount/input /wordcount/output
    • MapReduce本地模式

      • step1:mapreduce.framework.name=local

      • step2:输入输出路径使用本地文件系统 配置参数

      • step3:直接右键运行main方法。

    • Q:如何去判断一个MapReduce运行的是哪种模式?

      • 决定性因素是:mapreduce.framework.name 除了用户代码中设置外 一定要考虑执行环境。

        比如提交jar的机器上如果有安装hadoop并且配置了环境变量 环境变量会生效

      • 直接去yarn的页面上查看 只要在yarn上运行的 一定有执行记录 雁过留痕。

      • 看作业的job编号

        • job_1621761293130_0003

        • job_local332454745_0001


MapReduce的执行流程梳理

  • MapReduce输入和输出

  • 啥都不说了先上图~ 对这个步骤进行简单的解释~ 具体更详细的放在后续更新
    •  

    • 数据都是以<key,value>键值对的形式存在的。不管是输入还是输出。

    • 在实际编程中,也需要考虑key value的类型

      • 有些是默认组件行为决定的

      • 有些是业务决定的。

    • 关于inputpath

      • 指向的是一个文件,mr就处理这一个文件

      • 指向的是一个目录,mr就处理该目录下所有的文件 整体当做数据集处理。

    • 关于outputpath

      • 要求指定的目录为空目录 不能够存储 否则执行校验失败

      FileAlreadyExistsException: Output directory file:/D:/datasets/wordcount/output already exists

以上是关于hadoop离线day04--Hadoop MapReduce的主要内容,如果未能解决你的问题,请参考以下文章

hadoop离线day06--Hadoop MapReduceHDFS高阶

04Hadoop中的setPartitionerClass/SortComparator/GroupingComparator问题

hadoop离线day03--Hadoop HDFS

hadoop离线day02--Apache Hadoop

hadoop离线day05--Hadoop MapReduce

hadoop离线day05--Hadoop MapReduce