Hadoop 复习 ---- chapter06
Posted 在人间负债^
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop 复习 ---- chapter06相关的知识,希望对你有一定的参考价值。
Hadoop 复习 ---- chapter06
-
MapReduce 是面向大数据并行处理的计算模型、框架和平台
-
MapReduce 框架的运作完全基于“键值对”,即数据的输入是一批“键值对”(key-value),生成的结果也是一批“键值对”,只是有时候它们的类型不一样而已
-
Key 和 Value 的类由于需要支持被序列化(Serialize)操作,所以它们必须实现 Writable 接口,让框架对数据集的执行排序操作,key 的类还必须实现 WritableComparable 接口
-
MapReduce 运行机制,安装时间排序包括:输入分片(input split)、map 阶段、combiner 阶段、shuffle 阶段和 reduce 阶段
-
hadoop 的组件:hdfs、namenode、datanode、client
-
由 client 完成切片
1、切片的数量 == maptask 的数量
2、切片属于逻辑切片,不属于物理切片
3、切片的大小默认等价于 block 的大小,默认 block == 128M -
假设我们文件 258M,默认 block 128M,如何切分?
0-127M
128M-255M
256M-258M -
hadoop 常用的 map 输入类型
1:TextInputFormat:文本方式输入
(1):是 hadoop 的默认输入方式
(2):它的 RecordReader 是每行记录一个 RecordReader
RecordReader:键为字节的偏移量,LongWritable
RecordReader:值为每行的内容,Text
2:SequenceFileInputFormat:用于读取序列文件
3:NLineInputFormat:
(1):文件以行为单位进行 split,比如文件的每一行对应一个 map
(2):得到的 key 是每一行的位置(偏移量,LongWritable 类型),value 是每一行的内容,Text 类型
4:keyValueInputFormat:把输入文件每一行作为单独的一个记录,通过搜寻 tab 字符来把行拆分为键值对 -
hadoop 常用的 reduce 输出类型(reduce 写入 context 中的数据,最后写出到文本中的内容)
1:TextOutputFormat:文本方式输出
(1):是 hadoop 的默认输出
(2):它的 WriteReader 是 Reducer 的最终输出的键值对
(3):每个键值对占用一行,通过 tab 进行分割
2:NullOutputFormat,hadoop 中的/dev/null,将输出送进黑洞。
3:SequenceFileOutputFormat, 输出到 sequence file 格式文件
4:MultipleSequenceFileOutputFormat,MultipleTextOutputFormat,根据 key 将记录输出到不同的文件。
5:DBInputFormat 和 DBOutputFormat,从 DB 读取,输出到 DB -
Combiner 阶段
1、选项:是可选的
2、实质:Combiner 其实是一种本地化的 Reduce 操作
3、目的:合并重复 key 值的操作,以提高了宽带的传输效率
4、场景:求总数,最大值,最小值可以使用 Combiner,平均值不能使用 -
如何自定义一个 combiner(因为 combiner 就是 reducer),那么为什么还要自定义一个 combiner?
可以将本地化数据提前进行合并,减少传输次数,提高传输效率 -
hadoop 一共几个组件呢?
hdfs:namenode + datanode + secondarynamenode
yarn:resourcemanager + nodemanger -
yarn 工作原理
1、用户向 yarn 中的 resourcemanager 提交一个应用程序
2、resourcemanager 中的 application manager,ASM(负责整个系统的资源调度)application manager 找到一个可以运行一个 container 的 nodemanager。(container:是资源的抽象包)并在这个 container 中启动 application master 实例
3、application master 想 resourcemanager 中的 application manager 进行注册,这时客户端就可以查询 resourcemanager 获取自己的 application master,并与之直接交互了
4、application master 会将用户提交的应用程序拆分成一个个子任务申请资源
5、application master 会向 resourcemanager 为每个子任务申请资源
6、application master 为 AM 配置资源,并以 container 的形式返回
7、AM 手里有资源了,要将资源给子任务
8、AM 与 nodemanager 通信。在各个 container 中启动各个子任务
9、各个子任务以 task 形式在启动的 container 中运行
10、application master 监控各个子任务的运行。各个子任务主动汇报自己的状态和进度给 application master
11、当应用程序运行完成后,applicationmaster 向 RM 注销并且关闭自己 -
作业失败处理(四种失败场景)
1、AppMaster 失败处理
2、MapReduce 本身抛出异常导致作业失败
3、JVM 自动退出
4、挂起任务 -
练习
word.txt
hadoop hbase hive
hadoop spark zookeeper
hbase hive
现在要对 word.txt 进行单词统计,这是一个文件,文件大小是 1kb,这么时候,有几个分片呢?
client 对 word.txt 文件进行读取,读取的过程会读取到整个文件的信息,包含这个文件的大小、位置
client 里面的 FileInputFarmat 这个类里面有 getSpilts(),来获取切片
那么这个时候,先判断文件能不能切割,如果可以切割,就进行切割
因为这个文件只有 1kb,所以只有一个切片
切完篇,读该切片。因为 FileInputFormat 是一个抽象类,所以需要一个具体的实现类,来实现。
public abstract class FileInputFormat<K, V> extends InputFormat<K, V>
TextInputFormat extends FileInputFormat<LongWritable, Text>
我们默认的 InputFarmat 是 TextInputFormat
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>
我们自定义的输入数据的 key 和 value 的数据类型是系统设置的,不是我们用户定义的
LongWritable:mapper 输入数据的 key 的数据类型 == 我们所在的行首字母的位置偏移量
Text:mapper 输入数据的 value 的数据类型 == 我们当前行的数据
<0,hadoop hbase hive>,<19,hadoop spark zookeeper>,<43,hbase hive>
这个数据作为 mapper 的输入数据
mapper 阶段进行数据映射,将 word.txt 文件映射成,一个个键值对,写入到缓存中,这个缓存是 context
//<hadoop,1>,<hbase,1>,<hive,1>,<hadoop,1>,<spark,1>,<zookeeper,1>,<hbase,1>,<hive,1>
我们 mapper 的输出数据就是 reduce 的输入数据
//但是不是这样子,map和reduce中间有个过程叫shuffle
//[<hadoop,1>,<hadoop,1>],[<hbase,1>,<hbase,1>],[<hive,1>,<hive,1>],[<spark,1>],[<zookeeper,1>]
这个数据作为 reduce 的输入数据,我们对这个数据进行处理,处理完成以后,写入到 context
//<hadoop,2>,<hadoop,2>,<hive,2>,<spark,1>,<zookeeper,1>
最后将 context 中的数据写入本地,生成 part-r-oooN 文件
将 context 的每个键值对写入一行,键值对通过空格
hadoop 4
hbase 4
hive 4
spark 2
zookeeper 2
-
如何自定义 ProvincePartitioner ?
1、必须继承 Partitioner 类,重写 getPartitioner()
2、如果是红桃就将 getPartitioner() 的返回值设置 0
3、如果是黑桃就将 getPartitioner() 的返回值设置 1
3、如果是方片就将 getPartitioner() 的返回值设置 2
4、如果是美化就将 getPartitioner() 的返回值设置 3
5、在 job 对象中设置自定义的 partitioner:job.setPartitionerClass(ProvincePartitioner.class);
6、在 job 对象中设置 ReduceTask 的数量:job.setNumReduceTasks(5); -
那么我们 combiner 的数据和 reduce 的数据有什么不同?
combiner 处理的是单个 mapTask 的数据
reduce 处理的是所有的 mapTask 的数据
当 reduceTask == 1,它两数据就相同了 -
问题一:partition 的数量决定 reducetask 的数量
一:partition 的数量 == reducetask 的数量 == part-r-000Result 数量
二:partition 的数量 > reducetask 的数量(Reducetask != 1),结构系统无法运行,报错:java.lang.Exception: java.io.IOException: Illegal partition for 18271575951 (4)
三:partition 的数量 > reducetask 的数量(Reducetask == 1)== part-r-0000,分区无效了
四:partition 的数量 < reducetask 的数量 == part-r-0000Result 数量,其中会出现空文件
split 的数量决定 mapTask 的数量(map == mapTask)
分区 Partition 的数量决定 reduceTask 的数量
问题:频繁的磁盘I/O操作会严重的降低效率,为什么要磁盘频繁的IO操作呢?
mapper的输出数据是写入到context中.
reducer的输入数据是从context中获取的.
mapper都执行完,再执行reducer.
- 问题二:我们 context 有多大?
context 是内存,内存一般电脑是 8G
那么这个时候,我们想到一个解决方法.mapper将写到缓存的数据,不间断写入到本地磁盘中.
但是这种不间断的写入本地磁盘,会长期占用磁盘的I/O.这种长期占用,好吗?
不好,我们就提出了另一种解决方法.
我们将mapper的输出数据写入到一个缓存中,当这个缓存达到一定的阀值,就调用一次磁盘IO,
执行写操作.
-
问题三:当这个缓存全部写满我们再调用IO流写入磁盘吗?
不可以,如果这样的话,我们mapper的写入缓存操作就必须停止.所以不能设置缓存满为阀值.
我们设置缓存的阀值为0.8. -
问题四:我们写入磁盘用mapper的写入缓存的线程吗?
不用,需要另开一个新的磁盘写入线程,不得干扰mapper写入缓存的线程. -
问题五:我们缓存是一个什么结构的缓存.
环形结构缓存 == 环形内存缓冲区
1、环形缓存本质是一个 buffer,是一个数组,这个数组的大小 == 100M
2、equator:赤道。equator 在数组的前端,也在数组的最为短,你可以理解为一个环形。所以我称之为:环形缓冲区
3、沿着 equator 顺时针存储数据,逆时针方向存储元数据(索引)
3.1 每一个 <key, value>,对应存储一个元数据
3.2 元数据的结构:key/value 的元数据存储的格式是 int 类型,每个 key/value 对应一个元数据,元数据由 4 个 int 组成,第一个 int 存放 value 的起始位置,第二个 int 存放 key 的起始位置,第三个 int 存放 partition,第四个 int 存放 value 的长度 -
注意:
1、数据在进入环形缓存区的时候进行的分区
2、环形缓存区 == 100M,可以通过 io.sort.mb = 100M
3、环形缓存区的阈值 == 0.8,可以通过 mapreduce.map.io.sort.spill.percent = 0.8
4、环形缓存区溢写到磁盘的过程,我们称之为 spill(溢写)
5、溢写过程就是我们 shuffle 的 map 阶段的第一个过程
6、在溢写之前有个排序过程,我们称之为 sort(排序过程)
7、我们排序的优先级:partition > key
所以我们 hadoop 的 mapper 和 reducer 的输入输出数据必须是序列化的,必须实现 writable 接口,若要对 key 进行排序,所以输入输出数据的 key 必须实现 comparable 接口
8、在 sort 之后,溢写之前有个合并过程,我们称之为 combiner(合并过程,这个过程是可选的)
9、每次溢写都会在磁盘上生成两个临时文件 spillN.out 和spillN.index.out
我们的溢写过程很可能不是一次,所以会产生大量的溢写文件
但是我们最终应该以一个文件的形式展现给 shuffle 的 reduce 阶段
所以要对众多小文件,进行归并
10、对溢写文件的归并(merge)成一个大文件的过程,我们称之为归并过程(merge) -
shuffle 的 reduce 阶段
11、reduce从多个map进行拉取map最终输出数据的过程,我们称之为copy,这是shuffle的reduce阶段第一个过程12:对拉取的文件首先也是放置到一个缓存中,当缓存达到一定的阀值,输出reduce的本地.
13、在输出缓存中的数据到reduce本地之前,要进行排序,合并.每一次输出就形成一个输出文件.
14、对所有输出文件进行归并,合并一个大文件.
15、这个大文件作为reduce的输入.
16、reduce将输入文件进行处理后,输出到指定位置
纠正:分区好像是发生在溢出写过程之前,也就是当满足溢出写条件时,首先进行分区,然后分区内排序,并且选择性的combine,最后写出到磁盘。
-
Job 作业的过程包括:split、map、combiner、shuffle、reduce
-
split:分片,client 处理的.对文件进行分片
-
map: 就是我们自定义的 mapper
-
combiner:是一个可选阶段
-
shuffle:是 map 和 reduce 阶段的一个中间阶段
-
reduce:就是我们自定义的 reducer
-
shuffle:是 map 和 reduce 阶段的一个中间阶段
-
hadoop的三种调度器(调度器:将系统的中空闲资源按照一定的策略分配给作业。):FIFO FairScheduler CapacityScheduler
-
FIFO :First In First Out
1:hadoop1.x默认的调度器
2:这是一个单列队列
3:按照job的提交顺序和优先级进行排序 -
CapacityScheduler:容量调度器
1:hadoop2.x 3.x中默认的资源调度器
2:是多用户,多队列调度器,以队列为单位划分集群资源
3:能够保证每个队列可设定资源的最低保证和使用上限
4:每个队列的按照FIFO策略执行调度
5:不同队列可以根据自己的使用情况,租赁别的队列资源
6:不支持优先级,但是可以设置 -
FairScheduler
1:是多用户,多队列调度器,所有的作业都被放置在一个能够公平共享资源池中
2:如果该队列中只有一个job,该job获取全部资源。
3:如果该队列再添加一个job,这两个job平分资源
4:能够保证每一个用户都能获取一个最小额度资源。
5:如果一个用户提交大量的作业,他不会获取所有资源,而是被资源池限制。
以上是关于Hadoop 复习 ---- chapter06的主要内容,如果未能解决你的问题,请参考以下文章