hadoop离线day04--Hadoop MapReduce
Posted Vics异地我就
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop离线day04--Hadoop MapReduce相关的知识,希望对你有一定的参考价值。
目录
hadoop离线day04--Hadoop MapReduce
hadoop离线day04--Hadoop MapReduce
今日内容大纲
初始MapReduce
MapReduce背后的思想 先分再合,分而治之
MapReduce设计构思
MapReduce编程规范 进程
MapReduce入门案例--Wordcount
统计单词次数 业务及其简单
背后技术流程学会 掌握MapReduce80%
MapReduce程序执行
本地模式
yarn集群模式
MapReduce基本流程
input---->mr---->output 梳理输入和输出
基本整体流程梳理
初始MapReduce
-
MapReduce思想
-
核心:先分再合,分而治之。
-
使用场景:面向复杂的任务、庞大的任务
-
步骤
-
分的阶段(局部并行计算)--map
把复杂的任务拆分成若干个小的任务。 拆分的目的以并行方式处理小人物提高效率。 #前提:任务可以拆分,拆分之后没有依赖关系。 结果:每个任务处理完都是一个局部的结果。 map侧重于映射(对应关系) 任务1-->结果1 任务2-->结果2
-
汇总阶段(全局汇总计算)--reduce
把上一个分的阶段局部结果进行全局汇总 得到最终结果。 reduce指的是结果数量的减少 汇总。
-
-
-
Hadoop MapReduce 设计构思
-
如何面对大数据场景
#使用MapReduce思路来处理大数据。 先把数据集拆分若干个小的数据集,前提是可以拆分并且拆分之后没有依赖。 拆分之后可以并行计算提高计算。 再通过全局汇总计算得出最终结果。
-
构建了函数式编程模型Map Reduce
#函数本质就是映射。 f(x)=2x+1 当x=1 f(1)=3 当x=2 f(2)=5 x-->f(x) 一一对应的映射关系。 #对应MapReduce来说 每个阶段都是输入数据经过处理对应着输出。 MapReduce处理的数据类型是<key,value>键值对。 实际使用中 考虑每个阶段输入输出 key value是什么。
-
统一构架,隐藏系统层细节
精准的把技术问题和业务问题区分。 技术是通用的 业务不通用的。 hadoop实现了底层所有的技术问题。 --->80%代码 怎么做(how to do) 用户实现业务问题 --->20%代码 做什么(what need to do) 使用简单不代表技术简单 只能说MapReduce底层封装太牛逼。
-
-
MapReduce框架和编程规范
-
什么叫做编程规范。
因为最终MapReduce程序需要用户的代码和MapReduce自己实现的代码整合在一起 才能叫做完整MapReduce程序员。 所以MapReduce需要一定的编程规范来要求用户如何实现自己的代码。
-
从代码层面看(静态的)
-
类1--->继承Mapper 负责map阶段业务逻辑处理
-
类2--->继承Reducer 负责reduce阶段业务逻辑处理
-
类3(main方法)--->客户端驱动类 负责设定 组装 拼接MapReduce程序 设置各种参数。
-
-
从运行层面看(动态的)
-
maptask map阶段运行的task 处理map阶段的任务
-
reducetask reduce阶段运行的task 处理reduce阶段的任务
-
MrAppMaster 程序内部的老大 负责资源申请和task监督。
-
-
-
MapReduce入门案例--Wordcount单词统计
-
背景
网页倒排索引 统计关键字在页面中出现的次数。
-
业务需求
统计一个文件中每个单词出现的总次数。
-
mapper阶段
mapper阶段的代码需要继承mapper类 提供了3个方法可以在子类中覆盖重写。
/** * @description: MapReduce程序中mapper阶段运行的类型 对应着maptask * @author: Itcast * * KEYIN: 表示map阶段输入的kv中的k类型 在默认机制下 是每行起始位置偏移量 long * VALUEIN:表示map阶段输入的kv中的v类型 在默认机制下 是每行的内容 string * todo mapreduce默认读取数据组件 TextInputFormat类 :一行一行读取数据 按行读。 * key:每行起始位置偏移量 * value:这一行的内容 * <0,hello allen apple> * todo long、string是java封装的类型 hadoop认为其类型在序列化的时候效率不高 比较臃肿 * hadoop实现了自己的序列化机制 Writable * 基于这个Writable实现了对应的数据类型 * long----->longWritable * int------>intWritable * String---->Text * null------>nullWritable * KEYOUT: 表示map阶段输出的kv中的k类型 跟业务相关 本需求中 都是单词作为key String text * VALUEOUT 表示map阶段输出的kv中的v类型 跟业务相关 本需求中 都是单词次数1 int */ public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> { /** * todo map方法是mapper阶段的核心方法 整个业务逻辑就在该方法中实现 * Q:map方法是如何调用的。 * TextInputFormat读取一行数据 返回一个kv对 调用一次map方法 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //第一行内容 <0,hello hive hello> //拿到一行内容进行切割 String line = value.toString(); String[] words = line.split("\\\\s+"); //[hello,hive,hello] //遍历数组 for (String word : words) { //输出数据 把每个单词标记1 <单词,1> //使用框架提供的上下文对象 进行数据的输出 context.write(new Text(word),new IntWritable(1));//<hello,1> <hive,1> <hello,1> } } }
-
Reducer阶段
/** * @description: MapReduce程序中reducer阶段运行的类型 对应着reducetask * @author: Itcast * todo reduce输入kv类型 就是map的输出的kv类型 * KEYIN 对应着map的输出key 本需求中是单词 Text * VALUEIN 对应着map的输出value 本需求中是单词次数1 IntWritable * * KEYOUT reduce阶段最终的输出的key 也是mr程序的最终的输出 跟需求相关 本需求还是单词 Text * VALUEOUT reduce阶段最终的输出的value 也是mr程序的最终的输出 跟需求相关 本需求还是单词的总次数 IntWritable */ public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> { /** * todo reduce方法就是reducer阶段核心业务逻辑处理的方法 该方法是如何调用的? 一组一调用 * Q: 当所有的数据来到reducer之后 内部有什么行为? * <hello,1><hadoop,1><hello,1><hello,1><hadoop,1> * todo 1、所有的数据排序 排序规则:key的字典序 a-z * <hadoop,1><hadoop,1><hello,1><hello,1><hello,1> * * todo 2、分组grouping 分组规则:key相同的分为一组 * <hadoop,1><hadoop,1> * <hello,1><hello,1><hello,1> * todo 3、每组构成一个新的kv对 去调用reduce方法 * 新key: 该组共同的key * 新value: 该组所有的value组成的一个迭代器Iterable(理解为类似于集合数据结构) * * <hadoop,1><hadoop,1> ----> <hadoop,Iterable[1,1]> * <hello,1><hello,1><hello,1> ---> <hello,Iterable[1,1,1]> */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //定义个统计变量 int count = 0; //遍历迭代器 for (IntWritable value : values) { //累加 count += value.get(); } //使用上下文输出结果 context.write(key,new IntWritable(count)); } }
运行代码段
-
/** * @description: todo 使用Tool工具类进行驱动类封装和job的提交 * @author: Itcast */ public class WordCountDriver_v2 extends Configured implements Tool { public static void main(String[] args) throws Exception { //配置对象 Configuration conf = new Configuration(); //todo 使用工具类ToolRunner提交程序 ctrl+p 提示参数 int status = ToolRunner.run(conf, new WordCountDriver_v2(), args); //退出 System.exit(status); } @Override public int run(String[] args) throws Exception { //mr作业的核心类 Job 一个job就表示一次作业 Job job = Job.getInstance(getConf(), "word_count"); //设置本次mr作业运行的主类 job.setJarByClass(WordCountDriver_v2.class); //设置本次mr作业的mapper类 reducer类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //设置mapper阶段输出的key value类型 叫做程序中间的输出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置Reducer阶段输出的key value类型 也就是程序最终的输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置本次mr程序输入 输出的路径 FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //经过上述步骤执行 相当于将本次作业的job对象描述清楚了 接下来需要提交作业 // job.submit(); //使用下面方式提交 指定参数为true 驱动类就会追踪打印job的执行的信息 boolean result = job.waitForCompletion(true); // 最终程序退出 应该要和执行结果绑定 // 如果result为true 表示job succeeded 退出状态码为0 否则就是1 表示程序执行异常 return result ? 0:1; } }
-
-
MapReduce程序执行方式
-
MapReduce分为两种执行方式,本地模式local ,yarn集群模式。
-
所谓的执行方式之间的区别就是MapReduce程序在执行的时候,谁给分配资源。
-
本地模式:使用当前机器的本地环境,单机模拟分布式运行。由操作系统提供资源。
适合开发测试 debug方便 单机的 没有分布式的魅力了
-
集群模式:把mr程序打成jar提交给yarn集群 分配资源 进行分布式计算。
适合生产环境线上环境 分布式程序
-
-
运行在什么模式如何决定呢?
-
mapreduce.framework.name = local 本地模式
-
mapreduce.framework.name= yarn 集群模式
-
如果不指定默认是什么模式 是local模式。
-
此外还需要考虑执行环境中有没有配置 mapred-site.xml 会覆盖默认的。
-
-
MapReduce集群模式
-
step1: 代码中设置mapreduce.framework.name= yarn
-
step2:将程序打成jar包
-
step3:上传jar包到linux上(hadoop环境)
-
step4:使用命令提交yarn 保证yarn集群启动哦
#mr程序提交 hadoop jar xxx.jar [mainclass args1 args2] yarn jar xxx.jar [mainclass args1 args2] #栗子 hadoop jar example-mr-1.0.jar /wordcount/input /wordcount/output
-
-
MapReduce本地模式
-
step1:mapreduce.framework.name=local
-
step2:输入输出路径使用本地文件系统 配置参数
-
step3:直接右键运行main方法。
-
-
Q:如何去判断一个MapReduce运行的是哪种模式?
-
决定性因素是:mapreduce.framework.name 除了用户代码中设置外 一定要考虑执行环境。
比如提交jar的机器上如果有安装hadoop并且配置了环境变量 环境变量会生效
-
直接去yarn的页面上查看 只要在yarn上运行的 一定有执行记录 雁过留痕。
-
看作业的job编号
-
job_1621761293130_0003
-
job_local332454745_0001
-
-
-
MapReduce的执行流程梳理
-
MapReduce输入和输出
- 啥都不说了先上图~ 对这个步骤进行简单的解释~ 具体更详细的放在后续更新
-
-
-
数据都是以<key,value>键值对的形式存在的。不管是输入还是输出。
-
在实际编程中,也需要考虑key value的类型
-
有些是默认组件行为决定的
-
有些是业务决定的。
-
-
关于inputpath
-
指向的是一个文件,mr就处理这一个文件
-
指向的是一个目录,mr就处理该目录下所有的文件 整体当做数据集处理。
-
-
关于outputpath
-
要求指定的目录为空目录 不能够存储 否则执行校验失败
FileAlreadyExistsException: Output directory file:/D:/datasets/wordcount/output already exists
-
-
以上是关于hadoop离线day04--Hadoop MapReduce的主要内容,如果未能解决你的问题,请参考以下文章
hadoop离线day06--Hadoop MapReduceHDFS高阶
04Hadoop中的setPartitionerClass/SortComparator/GroupingComparator问题