Day10:YARN与Hive入门
Posted 保护胖丁
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Day10:YARN与Hive入门相关的知识,希望对你有一定的参考价值。
优秀是一种习惯
知识点01:回顾
-
为什么要设计Shuffle?
- 全局分组和排序
-
Map端的Shuffle阶段如何实现?
- Spill:将每个MapTask每个小的部分实现局部排序,生成多个有序的小文件
- MapTask将数据写入一个环形缓冲区【内存:100M】
- 阈值:80%
- 先排序:快排
- 后溢写:排序好的内容生成小文件
- Merge:将自己所有小文件合并为一个大文件,并且实现排序
- 实现每个MapTask整体有序
- 排序:插入排序
- Spill:将每个MapTask每个小的部分实现局部排序,生成多个有序的小文件
-
Reduce端的Shuffle阶段如何实现?
- Merge:将所有MapTask中属于自己的数据进行合并并排序
- 排序:插入排序
- Merge:将所有MapTask中属于自己的数据进行合并并排序
-
Combiner的功能是什么?如何实现Combiner?
- 功能:Map端聚合,利用MapTask个数远大于ReduceTask个数,MapTask提前做一次聚合
- 实现
- job.setCombinerClass(Reducer)
- 效果:减少Reduce输出的数据量,降低了Reduce负载
-
压缩的好处是什么?常见的压缩类型有哪些?MapReduce如何配置压缩?
- 优点:降低了数据存储大小,提高了IO传输速度,提高性能
- 类型:Snappy、Lzo、Lz4
- 配置
- Input:不用配置
- Map Out:配置开启和指定压缩类型
- Reduce Out:配置开启和指定压缩类型
-
Shuffle分组的规则是什么?如何自定义分组比较器?
-
规则:先调用分组比较器,如果有,直接调用比较方法,如果没有,调用K2的比较方法
-
定义
-
继承WritableComparator,重写compare
-
排序:大于、等于、小于
job.setSortCOmparatorClass
-
分组:等于、不等于
job.setGroupingComparatorClass
-
-
知识点02:目标
- MapReduce补充知识点
- 分片的规则:读取文件时,如何决定划分多少个分片,决定了MapTask个数
- MapReduce如何实现Join?
- Reduce Join
- Map Join【重点】
- YARN的介绍
- 功能与应用场景
- 架构以及MR程序的运行流程
- 资源管理和任务调度
- 数据仓库与Hive的入门
- 数据仓库
- Hive的介绍:功能与应用【重要】
- Hive的安装部署
- 案例:Wordcount、二手房统计
知识点03:MapReduce补充:分片规则
-
引入:Reduce的个数可以自己指定,Map的个数如何决定?
-
目标:了解TextInputFormat中的分片规则
-
路径
- step1:InputFormat的功能
- step2:读取数据的实现
- step3:分片的规则
-
实施
-
InputFormat的功能
- 功能一:将读取到的所有输入数据划分为多个分片Split
- 功能二:将每个分片的数据转换为KV
-
TextInputFormat读取数据的实现
- createRecordReader:真正调用读取器读取数据的方法
- LineRecordReader:真正读取器的对象【JavaBean】
- nextKeyValue:将每一条数据转换为KV结构的方法
-
TextInputFormat分片的规则
-
getSplits:用于将输入的所有数据划分为多个分片
-
规则
-
step1:判断是否分片的条件
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) 文件大小 / splitSize > 1.1
-
step2:计算splitSize大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize); 128M 1 Long.MAX_VALUE minsize = 最小分片数 max(1,mapreduce.input.fileinputformat.split.minsize=1) maxsize = 最大分片数 mapreduce.input.fileinputformat.split.maxsize
-
step3:computeSplitSize计算逻辑
Math.max(minSize, Math.min(maxSize, blockSize)) max(1,min(Long.MAX_VALUE,128M))
-
规则
- 判断当前文件的大小是否大于128M的1.1倍
- 如果大于:将文件的128M作为一个分片,再次判断,直到所有分片构建
- 如果不大于:剩下的整体作为一个分片
- 130M:1个分片
- 145M:2个
- split1:128M
- split2:17M
- 判断当前文件的大小是否大于128M的1.1倍
-
-
-
-
小结
- TextInputFormat中分片的规则是什么?
- 按照文件大小的1.1倍判断
- 大于:每128M作为一个分片
- 不大于:整体作为一个分片
- 注意:如果要干预MapTask个数,怎么干预?
- 调整minSplitSize和maxSplitSize大小
- TextInputFormat中分片的规则是什么?
知识点04:MapReduce补充:Reduce Join
-
引入:MapReduce用于实现数据统计分析,类似于SQL,如何实现Join过程?
-
目标:了解MapReduce中Reduce Join的实现
- 什么是Reduce Join,特点和应用场景是什么?
-
路径
- step1:Reduce Join的原理
- step2:Reduce Join的实现
- step3:Reduce Join的特点
-
实施
-
Reduce Join的原理
- 将Join的关联字段作为K2,将订单信息和商品的信息作为V2
- Shuffle过程中分组时候,会将相同商品id对应的商品信息和订单信息放入同一个迭代器中
- Reduce只要拼接即可
-
Reduce Join的实现
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @ClassName MRDriver * @Description TODO 实现ReduceJoin * @Create By Frank */ public class ReduceJoinMr extends Configured implements Tool { /** * 用于将Job的代码封装 * @param args * @return * @throws Exception */ @Override public int run(String[] args) throws Exception { //todo:1-构建一个Job Job job = Job.getInstance(this.getConf(),"model");//构建Job对象,调用父类的getconf获取属性的配置 job.setJarByClass(ReduceJoinMr.class);//指定可以运行的类型 //todo:2-配置这个Job //input // job.setInputFormatClass(TextInputFormat.class);//设置输入的类的类型,默认就是TextInputFormat Path inputPath1 = new Path("datas/join/orders.txt");//读取订单数据 Path inputPath2 = new Path("datas/join/product.txt");//读取商品的数据 //设置的路径可以给目录,也可以给定文件,如果给定目录,会将目录中所有文件作为输入,但是目录中不能包含子目录 TextInputFormat.setInputPaths(job,inputPath1,inputPath2);//为当前job设置输入的路径 //map job.setMapperClass(MRMapper.class);//设置Mapper的类,需要调用对应的map方法 job.setMapOutputKeyClass(Text.class);//设置Mapper输出的key类型 job.setMapOutputValueClass(Text.class);//设置Mapper输出的value类型 //shuffle // job.setPartitionerClass(HashPartitioner.class);//自定义分区 // job.setGroupingComparatorClass(null);//自定义分组的方式 // job.setSortComparatorClass(null);//自定义排序的方式 //reduce job.setReducerClass(MRReducer.class);//设置Reduce的类,需要调用对应的reduce方法 job.setOutputKeyClass(Text.class);//设置Reduce输出的Key类型 job.setOutputValueClass(Text.class);//设置Reduce输出的Value类型 job.setNumReduceTasks(1);//设置ReduceTask的个数,默认为1 //output:输出目录默认不能提前存在 // job.setOutputFormatClass(TextOutputFormat.class);//设置输出的类,默认我诶TextOutputFormat Path outputPath = new Path("datas/output/join/reduceJoin");//用程序的第三个参数作为输出 //解决输出目录提前存在,不能运行的问题,提前将目前删掉 //构建一个HDFS的文件系统 FileSystem hdfs = FileSystem.get(this.getConf()); //判断输出目录是否存在,如果存在就删除 if(hdfs.exists(outputPath)){ hdfs.delete(outputPath,true); } TextOutputFormat.setOutputPath(job,outputPath);//为当前Job设置输出的路径 //todo:3-提交运行Job return job.waitForCompletion(true) ? 0:-1; } /** * 程序的入口,调用run方法 * @param args */ public static void main(String[] args) throws Exception { //构建一个Configuration对象,用于管理这个程序所有配置,工作会定义很多自己的配置 Configuration conf = new Configuration(); //t通过Toolruner的run方法调用当前类的run方法 int status = ToolRunner.run(conf, new ReduceJoinMr(), args); //退出程序 System.exit(status); } /** * @ClassName MRMapper * @Description TODO 这是MapReduce模板的Map类 * 输入的KV类型:由inputformat决定,默认是TextInputFormat * 输出的KV类型:由map方法中谁作为key,谁作为Value决定 */ public static class MRMapper extends Mapper<LongWritable, Text, Text,Text> { Text outputKey= new Text(); Text outputValue = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //两个文件的每一行数据,就是Value // 要判断这条数据来自哪个文件 FileSplit fileSplit = (FileSplit) context.getInputSplit();//获取这条数据对应的分片 String name = fileSplit.getPath().getName();//获取这条数据对应的文件的名称 if("orders.txt".equals(name)){ //如果这是订单的数据:1001,20150710,p0001,2 String[] split1 = value.toString().split(","); //用商品id作为Key this.outputKey.set(split1[2]); //其他信息作为Value this.outputValue.set(split1[0]+"\\t"+split1[1]+"\\t"+split1[3]); //输出 context.write(this.outputKey,this.outputValue); }else{ //这是商品数据:p0001,直升机,1000,2000 String[] split2 = value.toString().split(","); //用商品id作为key this.outputKey.set(split2[0]); //用商品名称作为Value this.outputValue.set(split2[1]); //输出 context.write(this.outputKey,this.outputValue); } } } /** * @ClassName MRReducer * @Description TODO MapReduce模板的Reducer的类 * 输入的KV类型:由Map的输出决定,保持一致 * 输出的KV类型:由reduce方法中谁作为key,谁作为Value决定 */ public static class MRReducer extends Reducer<Text,Text,Text,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //传进来的是每个商品id对应的订单信息和商品名称 StringBuilder stringBuilder = new StringBuilder(); for (Text value : values) { //将订单信息和商品的名称进行拼接 stringBuilder.append(value.toString()+"\\t"); } //输出 context.write(key,new Text(stringBuilder.toString())); } } }
-
Reduce Join的特点
- 利用了Shuffle中全局分组实现两份数据的Join
- Join发生在Reduce端
- 必须经过Shuffle
- 应用:适合于大数据join大数据
-
-
小结
- 什么是Reduce Join,特点和应用场景是什么?
- 利用SHuffle中的分组,将Join字段作为K2,所有join字段相关的数据放在同一个迭代器中
- 特点:必须经过shuffle
- 应用:大数据join大数据
- 什么是Reduce Join,特点和应用场景是什么?
知识点05:MapReduce补充:Map Join
-
引入:Reduce Join必须经过Shuffle,有没有更好的方案?
-
目标:了解MapReduce中Map Join的实现
-
路径
- step1:Map Join的原理
- step2:Map Join的实现
- step3:Map Join的特点
-
实施
- Map Join的原理
-
在每一个MapTask 的内存中放入一份完整的商品表,前提:商品表的数据比较小
-
每个MapTask处理一部分订单表,直接在MapTask所在的内存中订单表的每一个小部分与商品表的完整的数据进行join
-
Map Join的实现
- Mapper类或者Reduce类总共有3个方法
- setup:初始化方法,在Mapper构建实例的时候会被调用一次
- map/reduce
- cleanUp:关闭方法,用于释放资源,最后释放之前会被调用一次
import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.Map; /** * @ClassName MRDriver * @Description TODO 这是MapReduce程序的Driver类的模板 * @Date 2020/5/30 10:34 * @Create By Frank */ public class MapJoinMr extends Configured implements Tool { /** * 用于将Job的代码封装 * @param args * @return * @throws Exception */ @Override public int run(String[] args) throws Exception { //todo:1-构建一个Job Job job = Job.getInstance(this.getConf(),"model");//构建Job对象,调用父类的getconf获取属性的配置 job.setJarByClass(MapJoinMr.class);//指定可以运行的类型 //todo:2-配置这个Job //input // job.setInputFormatClass(TextInputFormat.class);//设置输入的类的类型,默认就是TextInputFormat Path inputPath = new Path("datas/join/orders.txt");//将大的数据进行读取 //将订单作为输入 TextInputFormat.setInputPaths(job,inputPath);//为当前job设置输入的路径 //将商品表放入分布式缓存中 job.addCacheFile(new Path("datas/join/product.txt").toUri()); //map job.setMapperClass(MRMapper.class);//设置Mapper的类,需要调用对应的map方法 job.setMapOutputKeyClass(Text.class);//设置Mapper输出的key类型 job.setMapOutputValueClass(Text.class);//设置Mapper输出的value类型 //shuffle // job.setPartitionerClass(HashPartitioner.class);//自定义分区 // job.setGroupingComparatorClass(null);//自定义分组的方式 // job.setSortComparatorClass(null);//自定义排序的方式 //reduce // job.setReducerClass(MRReducer.class);//设置Reduce的类,需要调用对应的reduce方法 // job.setOutputKeyClass(NullWritable.class);//设置Reduce输出的Key类型 // job.setOutputValueClass(NullWritable.class);//设置Reduce输出的Value类型 job.setNumReduceTasks(0);//设置ReduceTask的个数,默认为1 //output:输出目录默认不能提前存在 // job.setOutputFormatClass(TextOutputFormat.class);//设置输出的类,默认我诶TextOutputFormat Path outputPath = new Path("datas/output/join/mapJoin");//用程序的第三个参数作为输出 //解决输出目录提前存在,不能运行的问题,提前将目前删掉 //构建一个HDFS的文件系统 FileSystem hdfs = FileSystem.get(this.getConf()); //判断输出目录是否存在,如果存在就删除 if(hdfs.exists(outputPath)){ hdfs.delete(outputPath,true); } TextOutputFormat.setOutputPath(job,outputPath);//为当前Job设置输出的路径 //todo:3-提交运行Job return job.waitForCompletion(true) ? 0:-1; } /** * 程序的入口,调用run方法 * @param args */ public static void main(String[] args) throws Exception { //构建一个Configuration对象,用于管理这个程序所有配置,工作会定义很多自己的配置 Configuration conf = new Configuration(); //t通过Toolruner的run方法调用当前类的run方法 int status = ToolRunner.run(conf, new MapJoinMr(), args); //退出程序 System.exit(status); } /** * @ClassName MRMapper * @Description TODO 这是MapReduce模板的Map类 * 输入的KV类型:由inputformat决定,默认是TextInputFormat * 输出的KV类型:由map方法中谁作为key,谁作为Value决定 */ public static class MRMapper extends Mapper<LongWritable, Text, Text,Text> { //获取分布式缓存中的数据,存入Map集合,Key是商品id,Value是商品名称 Map<String,String> map = new HashMap<String, String>(); /** * Map类中有三个方法 * setup:在调用map之前会调用一次,类似于初始化的方法 * map:实现Map处理逻辑的方法 * cleanup:Map结束以后会调用的方法,相当于close方法 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void setup(Context context) throws IOException, InterruptedException { //将缓存中的数据读取出来,封装到Map集合中 URI[] cacheFiles = context.getCacheFiles(); //打开这个缓存的文件 BufferedReader bufferedReader = new BufferedReader(new FileReader(cacheFiles[0].getPath())); //将每一行的内容,封装到Map集合中 String line = null; while(StringUtils.isNotBlank(line = bufferedReader.readLine())){ //分割将商品id和商品名称放入Map集合 String pid = line.split(",")[0]; String pname = line.split(",")[1]; map.put(pid,pname); } } @Overri
以上是关于Day10:YARN与Hive入门的主要内容,如果未能解决你的问题,请参考以下文章
Hive函数入门--案例:UDF实现手机号加密--代码实现与效果演示
- Mapper类或者Reduce类总共有3个方法