Hadoop 复习 ---- chapter06

Posted 在人间负债^

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop 复习 ---- chapter06相关的知识,希望对你有一定的参考价值。

Hadoop 复习 ---- chapter06

  1. MapReduce 是面向大数据并行处理的计算模型、框架和平台

  2. MapReduce 框架的运作完全基于“键值对”,即数据的输入是一批“键值对”(key-value),生成的结果也是一批“键值对”,只是有时候它们的类型不一样而已

  3. Key 和 Value 的类由于需要支持被序列化(Serialize)操作,所以它们必须实现 Writable 接口,让框架对数据集的执行排序操作,key 的类还必须实现 WritableComparable 接口

  4. MapReduce 运行机制,安装时间排序包括:输入分片(input split)、map 阶段、combiner 阶段、shuffle 阶段和 reduce 阶段

  5. hadoop 的组件:hdfs、namenode、datanode、client

  6. 由 client 完成切片
    1、切片的数量 == maptask 的数量
    2、切片属于逻辑切片,不属于物理切片
    3、切片的大小默认等价于 block 的大小,默认 block == 128M

  7. 假设我们文件 258M,默认 block 128M,如何切分?
    0-127M
    128M-255M
    256M-258M

  8. hadoop 常用的 map 输入类型
    1:TextInputFormat:文本方式输入
    (1):是 hadoop 的默认输入方式
    (2):它的 RecordReader 是每行记录一个 RecordReader
    RecordReader:键为字节的偏移量,LongWritable
    RecordReader:值为每行的内容,Text
    2:SequenceFileInputFormat:用于读取序列文件
    3:NLineInputFormat:
    (1):文件以行为单位进行 split,比如文件的每一行对应一个 map
    (2):得到的 key 是每一行的位置(偏移量,LongWritable 类型),value 是每一行的内容,Text 类型
    4:keyValueInputFormat:把输入文件每一行作为单独的一个记录,通过搜寻 tab 字符来把行拆分为键值对

  9. hadoop 常用的 reduce 输出类型(reduce 写入 context 中的数据,最后写出到文本中的内容)
    1:TextOutputFormat:文本方式输出
    (1):是 hadoop 的默认输出
    (2):它的 WriteReader 是 Reducer 的最终输出的键值对
    (3):每个键值对占用一行,通过 tab 进行分割
    2:NullOutputFormat,hadoop 中的/dev/null,将输出送进黑洞。
    3:SequenceFileOutputFormat, 输出到 sequence file 格式文件
    4:MultipleSequenceFileOutputFormat,MultipleTextOutputFormat,根据 key 将记录输出到不同的文件。
    5:DBInputFormat 和 DBOutputFormat,从 DB 读取,输出到 DB

  10. Combiner 阶段
    1、选项:是可选的
    2、实质:Combiner 其实是一种本地化的 Reduce 操作
    3、目的:合并重复 key 值的操作,以提高了宽带的传输效率
    4、场景:求总数,最大值,最小值可以使用 Combiner,平均值不能使用

  11. 如何自定义一个 combiner(因为 combiner 就是 reducer),那么为什么还要自定义一个 combiner?
    可以将本地化数据提前进行合并,减少传输次数,提高传输效率

  12. hadoop 一共几个组件呢?
    hdfs:namenode + datanode + secondarynamenode
    yarn:resourcemanager + nodemanger

  13. yarn 工作原理
    1、用户向 yarn 中的 resourcemanager 提交一个应用程序
    2、resourcemanager 中的 application manager,ASM(负责整个系统的资源调度)application manager 找到一个可以运行一个 container 的 nodemanager。(container:是资源的抽象包)并在这个 container 中启动 application master 实例
    3、application master 想 resourcemanager 中的 application manager 进行注册,这时客户端就可以查询 resourcemanager 获取自己的 application master,并与之直接交互了
    4、application master 会将用户提交的应用程序拆分成一个个子任务申请资源
    5、application master 会向 resourcemanager 为每个子任务申请资源
    6、application master 为 AM 配置资源,并以 container 的形式返回
    7、AM 手里有资源了,要将资源给子任务
    8、AM 与 nodemanager 通信。在各个 container 中启动各个子任务
    9、各个子任务以 task 形式在启动的 container 中运行
    10、application master 监控各个子任务的运行。各个子任务主动汇报自己的状态和进度给 application master
    11、当应用程序运行完成后,applicationmaster 向 RM 注销并且关闭自己

  14. 作业失败处理(四种失败场景)
    1、AppMaster 失败处理
    2、MapReduce 本身抛出异常导致作业失败
    3、JVM 自动退出
    4、挂起任务

  15. 练习

word.txt
hadoop hbase hive
hadoop spark zookeeper
hbase hive

现在要对 word.txt 进行单词统计,这是一个文件,文件大小是 1kb,这么时候,有几个分片呢?
client 对 word.txt 文件进行读取,读取的过程会读取到整个文件的信息,包含这个文件的大小、位置
client 里面的 FileInputFarmat 这个类里面有 getSpilts(),来获取切片
那么这个时候,先判断文件能不能切割,如果可以切割,就进行切割
因为这个文件只有 1kb,所以只有一个切片

切完篇,读该切片。因为 FileInputFormat 是一个抽象类,所以需要一个具体的实现类,来实现。
public abstract class FileInputFormat<K, V> extends InputFormat<K, V>
TextInputFormat extends FileInputFormat<LongWritable, Text>

我们默认的 InputFarmat 是 TextInputFormat
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>
我们自定义的输入数据的 key 和 value 的数据类型是系统设置的,不是我们用户定义的
LongWritable:mapper 输入数据的 key 的数据类型 == 我们所在的行首字母的位置偏移量
Text:mapper 输入数据的 value 的数据类型 == 我们当前行的数据

<0,hadoop hbase hive>,<19,hadoop spark zookeeper>,<43,hbase hive>
这个数据作为 mapper 的输入数据
mapper 阶段进行数据映射,将 word.txt 文件映射成,一个个键值对,写入到缓存中,这个缓存是 context
//<hadoop,1>,<hbase,1>,<hive,1>,<hadoop,1>,<spark,1>,<zookeeper,1>,<hbase,1>,<hive,1>

我们 mapper 的输出数据就是 reduce 的输入数据
//但是不是这样子,map和reduce中间有个过程叫shuffle
//[<hadoop,1>,<hadoop,1>],[<hbase,1>,<hbase,1>],[<hive,1>,<hive,1>],[<spark,1>],[<zookeeper,1>]
这个数据作为 reduce 的输入数据,我们对这个数据进行处理,处理完成以后,写入到 context
//<hadoop,2>,<hadoop,2>,<hive,2>,<spark,1>,<zookeeper,1>
最后将 context 中的数据写入本地,生成 part-r-oooN 文件
将 context 的每个键值对写入一行,键值对通过空格

hadoop	4
hbase	4
hive	4
spark	2
zookeeper	2
  1. 如何自定义 ProvincePartitioner ?
    1、必须继承 Partitioner 类,重写 getPartitioner()
    2、如果是红桃就将 getPartitioner() 的返回值设置 0
    3、如果是黑桃就将 getPartitioner() 的返回值设置 1
    3、如果是方片就将 getPartitioner() 的返回值设置 2
    4、如果是美化就将 getPartitioner() 的返回值设置 3
    5、在 job 对象中设置自定义的 partitioner:job.setPartitionerClass(ProvincePartitioner.class);
    6、在 job 对象中设置 ReduceTask 的数量:job.setNumReduceTasks(5);

  2. 那么我们 combiner 的数据和 reduce 的数据有什么不同?
    combiner 处理的是单个 mapTask 的数据
    reduce 处理的是所有的 mapTask 的数据
    当 reduceTask == 1,它两数据就相同了

  3. 问题一:partition 的数量决定 reducetask 的数量
    一:partition 的数量 == reducetask 的数量 == part-r-000Result 数量
    二:partition 的数量 > reducetask 的数量(Reducetask != 1),结构系统无法运行,报错:java.lang.Exception: java.io.IOException: Illegal partition for 18271575951 (4)
    三:partition 的数量 > reducetask 的数量(Reducetask == 1)== part-r-0000,分区无效了
    四:partition 的数量 < reducetask 的数量 == part-r-0000Result 数量,其中会出现空文件

split 的数量决定 mapTask 的数量(map == mapTask)
分区 Partition 的数量决定 reduceTask 的数量

问题:频繁的磁盘I/O操作会严重的降低效率,为什么要磁盘频繁的IO操作呢?
mapper的输出数据是写入到context中.
reducer的输入数据是从context中获取的.
mapper都执行完,再执行reducer.

  1. 问题二:我们 context 有多大?
    context 是内存,内存一般电脑是 8G

那么这个时候,我们想到一个解决方法.mapper将写到缓存的数据,不间断写入到本地磁盘中.
但是这种不间断的写入本地磁盘,会长期占用磁盘的I/O.这种长期占用,好吗?
不好,我们就提出了另一种解决方法.
我们将mapper的输出数据写入到一个缓存中,当这个缓存达到一定的阀值,就调用一次磁盘IO,
执行写操作.

  1. 问题三:当这个缓存全部写满我们再调用IO流写入磁盘吗?
    不可以,如果这样的话,我们mapper的写入缓存操作就必须停止.所以不能设置缓存满为阀值.
    我们设置缓存的阀值为0.8.

  2. 问题四:我们写入磁盘用mapper的写入缓存的线程吗?
    不用,需要另开一个新的磁盘写入线程,不得干扰mapper写入缓存的线程.

  3. 问题五:我们缓存是一个什么结构的缓存.
    环形结构缓存 == 环形内存缓冲区
    1、环形缓存本质是一个 buffer,是一个数组,这个数组的大小 == 100M
    2、equator:赤道。equator 在数组的前端,也在数组的最为短,你可以理解为一个环形。所以我称之为:环形缓冲区
    3、沿着 equator 顺时针存储数据,逆时针方向存储元数据(索引)
    3.1 每一个 <key, value>,对应存储一个元数据
    3.2 元数据的结构:key/value 的元数据存储的格式是 int 类型,每个 key/value 对应一个元数据,元数据由 4 个 int 组成,第一个 int 存放 value 的起始位置,第二个 int 存放 key 的起始位置,第三个 int 存放 partition,第四个 int 存放 value 的长度

  4. 注意:
    1、数据在进入环形缓存区的时候进行的分区
    2、环形缓存区 == 100M,可以通过 io.sort.mb = 100M
    3、环形缓存区的阈值 == 0.8,可以通过 mapreduce.map.io.sort.spill.percent = 0.8
    4、环形缓存区溢写到磁盘的过程,我们称之为 spill(溢写)
    5、溢写过程就是我们 shuffle 的 map 阶段的第一个过程
    6、在溢写之前有个排序过程,我们称之为 sort(排序过程)
    7、我们排序的优先级:partition > key
    所以我们 hadoop 的 mapper 和 reducer 的输入输出数据必须是序列化的,必须实现 writable 接口,若要对 key 进行排序,所以输入输出数据的 key 必须实现 comparable 接口
    8、在 sort 之后,溢写之前有个合并过程,我们称之为 combiner(合并过程,这个过程是可选的)
    9、每次溢写都会在磁盘上生成两个临时文件 spillN.out 和spillN.index.out
    我们的溢写过程很可能不是一次,所以会产生大量的溢写文件
    但是我们最终应该以一个文件的形式展现给 shuffle 的 reduce 阶段
    所以要对众多小文件,进行归并
    10、对溢写文件的归并(merge)成一个大文件的过程,我们称之为归并过程(merge)

  5. shuffle 的 reduce 阶段
    11、reduce从多个map进行拉取map最终输出数据的过程,我们称之为copy,这是shuffle的reduce阶段第一个过程12:对拉取的文件首先也是放置到一个缓存中,当缓存达到一定的阀值,输出reduce的本地.
    13、在输出缓存中的数据到reduce本地之前,要进行排序,合并.每一次输出就形成一个输出文件.
    14、对所有输出文件进行归并,合并一个大文件.
    15、这个大文件作为reduce的输入.
    16、reduce将输入文件进行处理后,输出到指定位置

纠正:分区好像是发生在溢出写过程之前,也就是当满足溢出写条件时,首先进行分区,然后分区内排序,并且选择性的combine,最后写出到磁盘。

  1. Job 作业的过程包括:split、map、combiner、shuffle、reduce

  2. split:分片,client 处理的.对文件进行分片

  3. map: 就是我们自定义的 mapper

  4. combiner:是一个可选阶段

  5. shuffle:是 map 和 reduce 阶段的一个中间阶段

  6. reduce:就是我们自定义的 reducer

  7. shuffle:是 map 和 reduce 阶段的一个中间阶段

  8. hadoop的三种调度器(调度器:将系统的中空闲资源按照一定的策略分配给作业。):FIFO FairScheduler CapacityScheduler

  9. FIFO :First In First Out
    1:hadoop1.x默认的调度器
    2:这是一个单列队列
    3:按照job的提交顺序和优先级进行排序

  10. CapacityScheduler:容量调度器
    1:hadoop2.x 3.x中默认的资源调度器
    2:是多用户,多队列调度器,以队列为单位划分集群资源
    3:能够保证每个队列可设定资源的最低保证和使用上限
    4:每个队列的按照FIFO策略执行调度
    5:不同队列可以根据自己的使用情况,租赁别的队列资源
    6:不支持优先级,但是可以设置

  11. FairScheduler
    1:是多用户,多队列调度器,所有的作业都被放置在一个能够公平共享资源池中
    2:如果该队列中只有一个job,该job获取全部资源。
    3:如果该队列再添加一个job,这两个job平分资源
    4:能够保证每一个用户都能获取一个最小额度资源。
    5:如果一个用户提交大量的作业,他不会获取所有资源,而是被资源池限制。

以上是关于Hadoop 复习 ---- chapter06的主要内容,如果未能解决你的问题,请参考以下文章

爆锤数据结构(期末复习笔记)

爆锤数据结构(期末复习笔记)

期末复习——操作系统概述 chapter(0+1)

Hadoop权威指南 第四版 高清PDF下载

Hadoop Illuminated——Chapter3 Why do I Need Hadoop?

Chapter7 Hadoop架构架构演进与生态组件