hadoop离线day05--Hadoop MapReduce
Posted Vics异地我就
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop离线day05--Hadoop MapReduce相关的知识,希望对你有一定的参考价值。
hadoop离线day05--Hadoop MapReduce
今日内容大纲
#1、MR序列化机制
什么叫做序列化 使用场景
Java中序列化机制
Hadoop序列化机制 Writable
自定义对象类型能否在MR中使用传递。
#2、自定义排序
默认字典序 a-z 正序 升序
如果需要倒序 如何实现?
Comparable接口 CompareTo方法
#3、自定义分区
默认分区规则 HashPartitioner
探究分区个数和reducetask个数之间的关系
#4、MR中优化组件 Combiner (了解)
map 局部计算
combiner局部聚合计算
reduce全局聚合计算
#5、MR的并行度机制
maptask reducetask是如何决定的
6、再次深入MapReduce 梳理流程 细节补充完整
mapper阶段执行流程
reducer阶段执行流程
shuffle 洗牌
-
MapReduce编程技巧
-
核心:牢牢把握住key 确定key是什么。
-
因为MR很多默认属性都和key相关。
排序 key字典序 分区 keyhash % numReducetask 分组 key相同
-
-
Hadoop序列化机制
-
什么是序列化
结构化对象和字节流之间转换问题。 对象--->字节流 序列化 字节流-->对象 反序列化
-
什么场合需要进行序列化
跨进程、跨网络传递数据的时候 对象数据持久化的时候
-
java中序列化
#java.io.Serializable 接口 在java中 如果对象需要序列化 只有实现java.io.Serializable接口即可。 标记用法。 java的序列化机制比较庞大臃肿,在序列化的时候不够干脆 额外携带很多附属信息。 hadoop就认为在大数据处理中效率不高。
-
hadoop序列化
//hadoop自己实现了一个序列化机制 Writable接口 public interface Writable { void write(DataOutput var1) throws IOException; //序列化方法 void readFields(DataInput var1) throws IOException;//反序列化方法 } //用户可以自己指定序列化什么属性 哪些属性 什么顺序序列化 //并且基于Writable接口 Hadoop自己封装了一套数据类型 建议在MapReduce中使用 //自己定义的对象可以在MapReduce中使用,前提是实现Writable接口
-
-
MapReduce的自定义排序
-
mr中key有默认的排序行为 :key的字典序、正序
-
思路
-
如果你的需求是正序字典序排序且你的数据类型是基本类型 此时直接把需要排序的数据作为key即可参与排序。
-
如果你的需求是倒序或者你的数据类型是对象引用类型 直接作为key不符合你的需求 重写排序规则。
-
-
java中对象是如何实现排序的
对象需要实现接口Comparable 重写CompareTo方法 a.compareTo(b) 返回0 表示相等 返回负数 表示a < b 返回正数 表示a > b #底层比较什么 http://ascii.911cha.com/ #如何实现倒序 改变结果正负即可 a 97 b 98 a.compareTo(b) -1 a < b ab -(-1) a > b ba
-
对应自定义对象类型 需要实现排序规则
-
对象中通常包含多个属性 需要在CompareTo方法中指定 根据哪些属性何种方式进行排序。
-
-
关于迭代器的遍历
-
增强for循环 iter
-
迭代器while循环遍历
Iterator<Text> iterator = values.iterator(); while (iterator.hasNext()){ iterator.next(); }
-
-
-
MapReduce的输出可以直接作为下一个MapReduce的输入来处理
-
MapReduce可以自己识别里面那些是校验文件 那些是标记文件 那些是真正的数据文件。
-
-
MapReduce自定义分区
-
思路
输出到不同文件中--->reducetask个数>1---->默认只有1个,如何变成多个--->job.setNumReduceTasks(N)--->意味着要分区partition---->默认分区规则HashPartitioner--->默认分区规则满足你业务需求吗?---->满足,直接用----->如果不满足--->重写分区规则--->如何重写呢?--->接下来仔细看!!
-
默认分区规则 HashPartitioner
public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } } //核心逻辑 key.hashCode() % numReduceTasks // & Integer.MAX_VALUE 有些数据hash值是负数 通过与计算变成正数
-
扩展:实现男的在一个分区,女的在一个分区
public class AllenPartitioner<K, V> extends Partitioner<K, V> { public int getPartition(K key, V value, int numReduceTasks) { if("男".equals(k)){ return 0; }else{ return 1; } } }
-
如何让自己的分区类在MapReduce中生效
job.setPartitionerClass(AllenPartitioner.class);
-
-
MapReduce Combiner组件
-
中文叫做规约
数据归约是指在尽可能保持数据原貌的前提下,最大限度地精简数据量。 Combine input records=54 Combine output records=21
-
Combiner是一个优化的组件,默认情况下不存在。慎重使用。
-
以下场景慎重使用
-
结果和数据个数有关的
-
结果和数据的顺序有关的
-
-
Combiner本质就是reducer 只不过聚合范围不是全局聚合 而是局部聚合。
-
栗子
#如果评估局部聚合的逻辑和最终reduce聚合的逻辑一样 可以省去写Combiner类 直接使用Reducer代替Combiner #需要在代码中进行设置 job.setCombinerClass(xxxx)
-
MapReduce并行度机制
-
并行度是什么
-
指的是maptask reducetask个数决定机制。 多个task一起运行不就是所谓的并行嘛。
-
-
Maptask并行度机制
-
机制: 逻辑切片 原理见画图
-
Q:影响maptask个数的有哪些因素。
文件个数 文件大小 split size
-
因为maptask并行度机制可以根据数据量自适应调整maptask个数,因为在正常情况下,不需要用户干预maptask并行度的。
-
-
reducetask并行度机制
-
机制:手动决定机制。
-
默认情况下,不管数据量多大,全局只有一个reducetask.
-
用户可以通过代码去设置reducetask个数。
job.setNumReduceTasks(N)
-
一旦设置多个reducetask 需要考虑数据会分区。一旦分区导致数据输出结果在不同文件中。
-
对于需要全局统计 全局计数的需求 只能使用一个reducetask。
-
当有多个reducetask,如果数据本身不规则或者分区策略选择不好,会导致reduce阶段数据倾斜。
-
MapReduce分区、分组区别
-
分区
-
发生map阶段输出数据到缓冲区之前。
-
默认规则是Hashpartitioner.
-
导致key相同的数据会被分到同一个分区 交给同一个reducetask来处理。
-
-
分组
-
发送reduce阶段数据调用reduce方法之前。
-
默认规则是前后key一样即为一组
-
导致相同的这一组数据去一起调用reduce方法。
-
以上是关于hadoop离线day05--Hadoop MapReduce的主要内容,如果未能解决你的问题,请参考以下文章
hadoop离线day06--Hadoop MapReduceHDFS高阶
hadoop离线day06--Hadoop MapReduceHDFS高阶