Day10:YARN与Hive入门

Posted 保护胖丁

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Day10:YARN与Hive入门相关的知识,希望对你有一定的参考价值。

                                                            优秀是一种习惯

知识点01:回顾

  1. 为什么要设计Shuffle?

    • 全局分组和排序
  2. Map端的Shuffle阶段如何实现?

    • Spill:将每个MapTask每个小的部分实现局部排序,生成多个有序的小文件
      • MapTask将数据写入一个环形缓冲区【内存:100M】
      • 阈值:80%
        • 先排序:快排
        • 后溢写:排序好的内容生成小文件
    • Merge:将自己所有小文件合并为一个大文件,并且实现排序
      • 实现每个MapTask整体有序
      • 排序:插入排序
  3. Reduce端的Shuffle阶段如何实现?

    • Merge:将所有MapTask中属于自己的数据进行合并并排序
      • 排序:插入排序
  4. Combiner的功能是什么?如何实现Combiner?

    • 功能:Map端聚合,利用MapTask个数远大于ReduceTask个数,MapTask提前做一次聚合
    • 实现
      • job.setCombinerClass(Reducer)
    • 效果:减少Reduce输出的数据量,降低了Reduce负载
  5. 压缩的好处是什么?常见的压缩类型有哪些?MapReduce如何配置压缩?

    • 优点:降低了数据存储大小,提高了IO传输速度,提高性能
    • 类型:Snappy、Lzo、Lz4
    • 配置
      • Input:不用配置
      • Map Out:配置开启和指定压缩类型
      • Reduce Out:配置开启和指定压缩类型
  6. Shuffle分组的规则是什么?如何自定义分组比较器?

    • 规则:先调用分组比较器,如果有,直接调用比较方法,如果没有,调用K2的比较方法

    • 定义

      • 继承WritableComparator,重写compare

      • 排序:大于、等于、小于

        job.setSortCOmparatorClass
        
      • 分组:等于、不等于

        job.setGroupingComparatorClass
        

知识点02:目标

  1. MapReduce补充知识点
    • 分片的规则:读取文件时,如何决定划分多少个分片,决定了MapTask个数
    • MapReduce如何实现Join?
      • Reduce Join
      • Map Join【重点】
  2. YARN的介绍
    • 功能与应用场景
    • 架构以及MR程序的运行流程
    • 资源管理和任务调度
  3. 数据仓库与Hive的入门
    • 数据仓库
    • Hive的介绍:功能与应用【重要】
    • Hive的安装部署
    • 案例:Wordcount、二手房统计

知识点03:MapReduce补充:分片规则

  • 引入:Reduce的个数可以自己指定,Map的个数如何决定?

  • 目标了解TextInputFormat中的分片规则

  • 路径

    • step1:InputFormat的功能
    • step2:读取数据的实现
    • step3:分片的规则
  • 实施

    • InputFormat的功能

      • 功能一:将读取到的所有输入数据划分为多个分片Split
      • 功能二:将每个分片的数据转换为KV
    • TextInputFormat读取数据的实现

      • createRecordReader:真正调用读取器读取数据的方法
      • LineRecordReader:真正读取器的对象【JavaBean】
        • nextKeyValue:将每一条数据转换为KV结构的方法
    • TextInputFormat分片的规则

      • getSplits:用于将输入的所有数据划分为多个分片

      • 规则

        • step1:判断是否分片的条件

          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP)
              	文件大小 /  splitSize >  1.1
          
        • step2:计算splitSize大小

          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
          									128M		1		Long.MAX_VALUE
                                                  
          minsize = 最小分片数 max(1,mapreduce.input.fileinputformat.split.minsize=1)
          maxsize = 最大分片数 mapreduce.input.fileinputformat.split.maxsize                      	
          
        • step3:computeSplitSize计算逻辑

          Math.max(minSize, Math.min(maxSize, blockSize))
          max(1,min(Long.MAX_VALUE,128M))
          
        • 规则

          • 判断当前文件的大小是否大于128M的1.1倍
            • 如果大于:将文件的128M作为一个分片,再次判断,直到所有分片构建
            • 如果不大于:剩下的整体作为一个分片
          • 130M:1个分片
          • 145M:2个
            • split1:128M
            • split2:17M
  • 小结

    • TextInputFormat中分片的规则是什么?
      • 按照文件大小的1.1倍判断
      • 大于:每128M作为一个分片
      • 不大于:整体作为一个分片
    • 注意:如果要干预MapTask个数,怎么干预?
      • 调整minSplitSize和maxSplitSize大小

知识点04:MapReduce补充:Reduce Join

  • 引入:MapReduce用于实现数据统计分析,类似于SQL,如何实现Join过程?

  • 目标了解MapReduce中Reduce Join的实现

    • 什么是Reduce Join,特点和应用场景是什么?
  • 路径

    • step1:Reduce Join的原理
    • step2:Reduce Join的实现
    • step3:Reduce Join的特点
  • 实施

    • Reduce Join的原理
      在这里插入图片描述

      • 将Join的关联字段作为K2,将订单信息和商品的信息作为V2
      • Shuffle过程中分组时候,会将相同商品id对应的商品信息和订单信息放入同一个迭代器中
      • Reduce只要拼接即可
    • Reduce Join的实现

    在这里插入图片描述

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    
    /**
     * @ClassName MRDriver
     * @Description TODO 实现ReduceJoin
     * @Create By     Frank
     */
    public class ReduceJoinMr extends Configured implements Tool {
    
        /**
         * 用于将Job的代码封装
         * @param args
         * @return
         * @throws Exception
         */
        @Override
        public int run(String[] args) throws Exception {
            //todo:1-构建一个Job
            Job job = Job.getInstance(this.getConf(),"model");//构建Job对象,调用父类的getconf获取属性的配置
            job.setJarByClass(ReduceJoinMr.class);//指定可以运行的类型
            //todo:2-配置这个Job
            //input
    //        job.setInputFormatClass(TextInputFormat.class);//设置输入的类的类型,默认就是TextInputFormat
            Path inputPath1 = new Path("datas/join/orders.txt");//读取订单数据
            Path inputPath2 = new Path("datas/join/product.txt");//读取商品的数据
            //设置的路径可以给目录,也可以给定文件,如果给定目录,会将目录中所有文件作为输入,但是目录中不能包含子目录
            TextInputFormat.setInputPaths(job,inputPath1,inputPath2);//为当前job设置输入的路径
    
            //map
            job.setMapperClass(MRMapper.class);//设置Mapper的类,需要调用对应的map方法
            job.setMapOutputKeyClass(Text.class);//设置Mapper输出的key类型
            job.setMapOutputValueClass(Text.class);//设置Mapper输出的value类型
    
            //shuffle
    //        job.setPartitionerClass(HashPartitioner.class);//自定义分区
    //        job.setGroupingComparatorClass(null);//自定义分组的方式
    //        job.setSortComparatorClass(null);//自定义排序的方式
    
            //reduce
            job.setReducerClass(MRReducer.class);//设置Reduce的类,需要调用对应的reduce方法
            job.setOutputKeyClass(Text.class);//设置Reduce输出的Key类型
            job.setOutputValueClass(Text.class);//设置Reduce输出的Value类型
            job.setNumReduceTasks(1);//设置ReduceTask的个数,默认为1
    
            //output:输出目录默认不能提前存在
    //        job.setOutputFormatClass(TextOutputFormat.class);//设置输出的类,默认我诶TextOutputFormat
            Path outputPath = new Path("datas/output/join/reduceJoin");//用程序的第三个参数作为输出
            //解决输出目录提前存在,不能运行的问题,提前将目前删掉
            //构建一个HDFS的文件系统
            FileSystem hdfs = FileSystem.get(this.getConf());
            //判断输出目录是否存在,如果存在就删除
            if(hdfs.exists(outputPath)){
                hdfs.delete(outputPath,true);
            }
            TextOutputFormat.setOutputPath(job,outputPath);//为当前Job设置输出的路径
    
            //todo:3-提交运行Job
            return job.waitForCompletion(true) ? 0:-1;
        }
    
        /**
         * 程序的入口,调用run方法
         * @param args
         */
        public static void main(String[] args) throws Exception {
            //构建一个Configuration对象,用于管理这个程序所有配置,工作会定义很多自己的配置
            Configuration conf = new Configuration();
            //t通过Toolruner的run方法调用当前类的run方法
            int status = ToolRunner.run(conf, new ReduceJoinMr(), args);
            //退出程序
            System.exit(status);
        }
    
    
        /**
         * @ClassName MRMapper
         * @Description TODO 这是MapReduce模板的Map类
         *      输入的KV类型:由inputformat决定,默认是TextInputFormat
         *      输出的KV类型:由map方法中谁作为key,谁作为Value决定
         */
        public static class MRMapper extends Mapper<LongWritable, Text, Text,Text> {
    
            Text outputKey= new Text();
            Text outputValue = new Text();
    
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                //两个文件的每一行数据,就是Value
                // 要判断这条数据来自哪个文件
                FileSplit fileSplit = (FileSplit) context.getInputSplit();//获取这条数据对应的分片
                String name = fileSplit.getPath().getName();//获取这条数据对应的文件的名称
                if("orders.txt".equals(name)){
                    //如果这是订单的数据:1001,20150710,p0001,2
                    String[] split1 = value.toString().split(",");
                    //用商品id作为Key
                    this.outputKey.set(split1[2]);
                    //其他信息作为Value
                    this.outputValue.set(split1[0]+"\\t"+split1[1]+"\\t"+split1[3]);
                    //输出
                    context.write(this.outputKey,this.outputValue);
                }else{
                    //这是商品数据:p0001,直升机,1000,2000
                    String[] split2 = value.toString().split(",");
                    //用商品id作为key
                    this.outputKey.set(split2[0]);
                    //用商品名称作为Value
                    this.outputValue.set(split2[1]);
                    //输出
                    context.write(this.outputKey,this.outputValue);
                }
            }
        }
    
    
    
        /**
         * @ClassName MRReducer
         * @Description TODO MapReduce模板的Reducer的类
         *      输入的KV类型:由Map的输出决定,保持一致
         *      输出的KV类型:由reduce方法中谁作为key,谁作为Value决定
         */
        public static class MRReducer extends Reducer<Text,Text,Text,Text> {
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                //传进来的是每个商品id对应的订单信息和商品名称
                StringBuilder stringBuilder = new StringBuilder();
                for (Text value : values) {
                    //将订单信息和商品的名称进行拼接
                    stringBuilder.append(value.toString()+"\\t");
                }
                //输出
                context.write(key,new Text(stringBuilder.toString()));
    
            }
        }
    
    
    }
    
    
    • Reduce Join的特点

      • 利用了Shuffle中全局分组实现两份数据的Join
      • Join发生在Reduce端
      • 必须经过Shuffle
      • 应用:适合于大数据join大数据
  • 小结

    • 什么是Reduce Join,特点和应用场景是什么?
      • 利用SHuffle中的分组,将Join字段作为K2,所有join字段相关的数据放在同一个迭代器中
      • 特点:必须经过shuffle
      • 应用:大数据join大数据

知识点05:MapReduce补充:Map Join

  • 引入:Reduce Join必须经过Shuffle,有没有更好的方案?

  • 目标了解MapReduce中Map Join的实现

  • 路径

    • step1:Map Join的原理
    • step2:Map Join的实现
    • step3:Map Join的特点
  • 实施

    • Map Join的原理

    在这里插入图片描述

    • 在每一个MapTask 的内存中放入一份完整的商品表,前提:商品表的数据比较小

    • 每个MapTask处理一部分订单表,直接在MapTask所在的内存中订单表的每一个小部分与商品表的完整的数据进行join

    • Map Join的实现

      • Mapper类或者Reduce类总共有3个方法
        • setup:初始化方法,在Mapper构建实例的时候会被调用一次
        • map/reduce
        • cleanUp:关闭方法,用于释放资源,最后释放之前会被调用一次

      在这里插入图片描述

      
      import org.apache.commons.lang.StringUtils;
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.conf.Configured;
      import org.apache.hadoop.fs.FileSystem;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.NullWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.Mapper;
      import org.apache.hadoop.mapreduce.Reducer;
      import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
      import org.apache.hadoop.util.Tool;
      import org.apache.hadoop.util.ToolRunner;
      
      import java.io.BufferedReader;
      import java.io.FileReader;
      import java.io.IOException;
      import java.net.URI;
      import java.util.HashMap;
      import java.util.Map;
      
      /**
       * @ClassName MRDriver
       * @Description TODO 这是MapReduce程序的Driver类的模板
       * @Date 2020/5/30 10:34
       * @Create By     Frank
       */
      public class MapJoinMr extends Configured implements Tool {
      
          /**
           * 用于将Job的代码封装
           * @param args
           * @return
           * @throws Exception
           */
          @Override
          public int run(String[] args) throws Exception {
              //todo:1-构建一个Job
              Job job = Job.getInstance(this.getConf(),"model");//构建Job对象,调用父类的getconf获取属性的配置
              job.setJarByClass(MapJoinMr.class);//指定可以运行的类型
              //todo:2-配置这个Job
              //input
      //        job.setInputFormatClass(TextInputFormat.class);//设置输入的类的类型,默认就是TextInputFormat
              Path inputPath = new Path("datas/join/orders.txt");//将大的数据进行读取
              //将订单作为输入
              TextInputFormat.setInputPaths(job,inputPath);//为当前job设置输入的路径
              //将商品表放入分布式缓存中
              job.addCacheFile(new Path("datas/join/product.txt").toUri());
      
              //map
              job.setMapperClass(MRMapper.class);//设置Mapper的类,需要调用对应的map方法
              job.setMapOutputKeyClass(Text.class);//设置Mapper输出的key类型
              job.setMapOutputValueClass(Text.class);//设置Mapper输出的value类型
      
              //shuffle
      //        job.setPartitionerClass(HashPartitioner.class);//自定义分区
      //        job.setGroupingComparatorClass(null);//自定义分组的方式
      //        job.setSortComparatorClass(null);//自定义排序的方式
      
              //reduce
      //        job.setReducerClass(MRReducer.class);//设置Reduce的类,需要调用对应的reduce方法
      //        job.setOutputKeyClass(NullWritable.class);//设置Reduce输出的Key类型
      //        job.setOutputValueClass(NullWritable.class);//设置Reduce输出的Value类型
              job.setNumReduceTasks(0);//设置ReduceTask的个数,默认为1
      
              //output:输出目录默认不能提前存在
      //        job.setOutputFormatClass(TextOutputFormat.class);//设置输出的类,默认我诶TextOutputFormat
              Path outputPath = new Path("datas/output/join/mapJoin");//用程序的第三个参数作为输出
              //解决输出目录提前存在,不能运行的问题,提前将目前删掉
              //构建一个HDFS的文件系统
              FileSystem hdfs = FileSystem.get(this.getConf());
              //判断输出目录是否存在,如果存在就删除
              if(hdfs.exists(outputPath)){
                  hdfs.delete(outputPath,true);
              }
              TextOutputFormat.setOutputPath(job,outputPath);//为当前Job设置输出的路径
      
              //todo:3-提交运行Job
              return job.waitForCompletion(true) ? 0:-1;
          }
      
          /**
           * 程序的入口,调用run方法
           * @param args
           */
          public static void main(String[] args) throws Exception {
              //构建一个Configuration对象,用于管理这个程序所有配置,工作会定义很多自己的配置
              Configuration conf = new Configuration();
              //t通过Toolruner的run方法调用当前类的run方法
              int status = ToolRunner.run(conf, new MapJoinMr(), args);
              //退出程序
              System.exit(status);
          }
      
      
          /**
           * @ClassName MRMapper
           * @Description TODO 这是MapReduce模板的Map类
           *      输入的KV类型:由inputformat决定,默认是TextInputFormat
           *      输出的KV类型:由map方法中谁作为key,谁作为Value决定
           */
          public static class MRMapper extends Mapper<LongWritable, Text, Text,Text> {
              //获取分布式缓存中的数据,存入Map集合,Key是商品id,Value是商品名称
              Map<String,String> map = new HashMap<String, String>();
      
      
              /**
               * Map类中有三个方法
               *      setup:在调用map之前会调用一次,类似于初始化的方法
               *      map:实现Map处理逻辑的方法
               *      cleanup:Map结束以后会调用的方法,相当于close方法
               * @param context
               * @throws IOException
               * @throws InterruptedException
               */
              @Override
              protected void setup(Context context) throws IOException, InterruptedException {
                  //将缓存中的数据读取出来,封装到Map集合中
                  URI[] cacheFiles = context.getCacheFiles();
                  //打开这个缓存的文件
                  BufferedReader bufferedReader = new BufferedReader(new FileReader(cacheFiles[0].getPath()));
                  //将每一行的内容,封装到Map集合中
                  String line = null;
                  while(StringUtils.isNotBlank(line = bufferedReader.readLine())){
                      //分割将商品id和商品名称放入Map集合
                      String pid = line.split(",")[0];
                      String pname = line.split(",")[1];
                      map.put(pid,pname);
                  }
              }
      
              @Overri

      以上是关于Day10:YARN与Hive入门的主要内容,如果未能解决你的问题,请参考以下文章

      Hive函数入门--案例:UDF实现手机号加密--代码实现与效果演示

      Day12:Hive特殊使用与函数

      10.)yarn的安装及详细的使用入门图文教程

      Day11:Hive的基本语法与使用

      Python入门学习-DAY37-进程池与线程池协程gevent模块

      Logistics_Day15:ClickHouse 存储引擎