五万字,57道hadoop大厂高频面试题,每一字都细心打磨,强烈建议收藏!

Posted 雷恩Layne

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了五万字,57道hadoop大厂高频面试题,每一字都细心打磨,强烈建议收藏!相关的知识,希望对你有一定的参考价值。

博主闭关两个多月,查阅了数百万字的大数据资料,结合自身的学习和工作经历,总结了大厂高频面试题,里面涵盖几乎所有我见到的大数据面试题目。

《大厂高频面试题系列》目前已总结4篇文章,且在持续更新中✍。文中用最直白的语言解释了Hadoop、Hive、Kafka、Flume、Spark等大数据技术和原理,细节也总结的很到位,是不可多得的大数据面试宝典,强烈建议收藏,祝大家都能拿到心仪的大厂offer🏆。下面是相关的系列文章:

文章目录

1. 简述hadoop1与hadoop2 的架构异同

在hadoop1时代,Hadoop中的MapReduce同时处理业务逻辑运算和资源的调度,耦合性较大。

在hadoop2时代,增加了Yarn 。 Yarn 只负责资 源 的 调 度 ,MapReduce 只负责运算 。另外,hadoop HA加入了对zookeeper的支持实现比较可靠的高可用。

2. 请介绍一下hadoop的HDFS

HDFS(Hadoop Distributed File System),它是一个文件系统,用于存储文件,通过目录树来定位文件; 其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色。HDFS 的使用场景:适合一次写入,多次读出的场景。一个文件经过创建、写入和关闭之后就不需要改变。

HDFS优点

1)高容错性

  • 数据自动保存多个副本。它通过增加副本的形式,提高容错性。
  • 某一个副本丢失以后,它可以自动恢复。

2)适合处理大数据

  • 数据规模:能够处理数据规模达到GB、TB、甚至PB级别的数据;
  • 文件规模:能够处理百万规模以上的文件数量,数量相当之大。

3)可构建在廉价机器上,通过多副本机制,提高可靠性。

HDFS缺点

1)不适合低延时数据访问,比如毫秒级的存储数据,是做不到的。

2)无法高效的对大量小文件进行存储。

  • 存储大量小文件的话,它会占用NameNode大量的内存来存储文件目录和块信息。这样是不可取的,因为NameNode的内存总是有限的;
  • 小文件存储的寻址时间会超过读取时间,它违反了HDFS的设计目标。
  • HDFS的block大小可以设置,默认为128M,一个文件块目录都是占用150字节,128M能存储9亿个小文件目录信息

3)不支持并发写入、文件随机修改。

  • 一个文件只能有一个写,不允许多个线程同时写
  • 仅支持数据append(追加),不支持文件的随机修改。

3. Hadoop的运行模式有哪些

本地模式、伪分布式模式、完全分布式模式

  • 本地模式:单机运行,只是用来演示一下官方案例。生产环境不用。
  • 伪分布式模式:也是单机运行,但是具备 Hadoop 集群的所有功能,一台服务器模拟一个分布式的环境。个别缺钱的公司用来测试,生产环境不用。
  • 完全分布式模式:多台服务器组成分布式环境。生产环境使用。

4. Hadoop生态圈的组件并做简要描述

1)Zookeeper: 是一个开源的分布式协调服务框架,为分布式系统提供一致性服务。基于zookeeper可以实现数据同步(数据发布/订阅),统计配置,命名服务。

2)Flume:是一个分布式的海量日志采集、聚合和传输的系统。

3)Hbase:是一个分布式的、面向列的开源数据库, 利用Hadoop HDFS作为其存储系统。

4)Hive:基于Hadoop的一个数据仓库工具,可以将结构化的数据档映射为一张数据库表,并提供简单的sql 查询功能,可以将sql语句转换为MapReduce任务进行运行。

5)Sqoop:将一个关系型数据库中的数据导进到Hadoop的 HDFS中,也可以将HDFS的数据导进到关系型数据库中。

5. 解释“hadoop”和“hadoop生态系统”两个概念

Hadoop是指Hadoop框架本身;hadoop生态系统,不仅包含hadoop,还包括保证hadoop框架正常高效运行其他框架,比如zookeeper、Flume、Hbase、Hive、Sqoop等辅助框架。

6. 请介绍HDFS的组成架构

(一)NameNode(nn):就是Master。NameNode负责管理整个文件系统的元数据,所有的文件路径和数据块的存储信息都保存在NameNode(其实就是文件系统中所有目录和文件inode的序列化形式)。除此之外,NameNode处理客户端读写请求,分发给DataNode去执行。

(二)DataNode:就是Slave。存储实际的数据块,执行真正的读写操作。

(三)Client:就是客户端。

  • 文件切分。文件上传HDFS的时候,Client(根据NameNode数据块的大小)将文件切分成一个一个的Block,然后进行上传
  • 与NameNode交互,获取文件的位置信息
  • 与DataNode交互,读取或者写入数据
  • Client提供一些命令来管理HDFS,比如NameNode格式化
  • Client可以通过一些命令来访问HDFS,比如对HDFS增删查改操作

(四)SecondaryNameNode:并非NameNode的热备。SecondaryNameNode定期触发CheckPoint(服务),代替NameNode合并编辑日志EditLog和镜像文件Fsimage,从而减小EditLog的大小,减少NN启动时间。同时在合并期间,NameNode也可以对外提供写操作。

7. HDFS的文件块大小有什么影响

HDFS 中的文件在物理上是分块存储 (Block ) , 块的大小可以通过配置参数( dfs.blocksize)来规定,默认大小在Hadoop2.x/3.x版本中是128M,1.x版本中是64M。

如果一个文件文件小于128M,该文件会占用128M的空间吗?不是的,它只占用文件本身大小的空间,其它空间别的文件也可以用,所以这128M的含义是HDFS数据块的大小,和每个文件的大小没有关系。

把下图的流程过一下:

思考:为什么块的大小不能设置太小,也不能设置太大?

  • HDFS的块设置太小,会增加寻址时间。例如,块的大小是1KB,文件大小是100KB,这时候要分100个块来存储文件,读取文件时要找到100个块的地址,会大大增加寻址时间。
  • 如果块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。导致程序在处理这块数据时,会非常慢。比如,块的大小是1TB,传输这个1TB的数据会非常慢,并且程序处理这个1TB的数据时,也非常的慢。

总结HDFS块的大小设置主要取决于磁盘传输速率。对于一般硬盘来说,传输速率为100M/s,一般设置块的大小128M,因为128是2的7次方,最接近于100M。固态硬盘一般传输速率为200M/s~300M/s,可以设置块大小为256M。在企业,128M和256M是常用的块大小。

8. HDFS的写(上传)数据流程

(1)HDFS client创建DistributedFileSystem 对象,通过该对象向 NameNode 请求上传文件,NameNode 检查权限,并判断目标文件是否已存在。
(2)如果权限许可,目标文件也存在,NameNode进行响应,返回是否可以上传文件。
(3)客户端请求第一个 Block 上传到哪几个 DataNode 服务器上。
(4)NameNode 返回 3 个 DataNode 节点(默认副本数为3),分别为 dn1、dn2、dn3。(这一步要考虑服务器是否可用、副本存储节点选择策略、DataNode负载均衡)
(5) 客户端创建FSDataOutputStream数据流对象,通过该对象请求dn1(即DataNode1) 上传(或写)数据(即建立传输通道), dn1 收到请求会继续调用dn2建立通道,然后 dn2 调用 dn3,这样dn1~dn3的通信通道建立完成
(6)传输通道建立完成后,dn1、dn2、dn3 逐级应答客户端。
(7)客户端开始往 dn1 上传第一个 Block (先从磁盘读取数据放到一个本地内存缓存),以Packet为单位(每次发送的是一个Packet对象),dn1 收到一个 Packet (直接从内存中)传给 dn2,dn2 传给 dn3。需要注意的是,这里传输的packet大小是64K,这个64K的packet其实就是一个缓冲队列,里面包含多个(chunk和chunksum),一个chunk是512byte,其校验码chunksum是4byte。dn1 每传一个 packet会放入一个应答队列(即ack队列,起备份作用)等待应答,当所有的DataNode应答成功后,会将该packet从应答队列中移除。
(8)当一个 Block 传输完成之后,客户端再次请求 NameNode 上传第二个 Block 到服务器。(重复执行 3-7 步)。

(9)所有的Block传输完毕并确认完成后,HDFS CLient关闭FSDataOutputStream数据流对象。然后,HDFS Client联系NameNode,确认数据写完成,NameNode 在内存中对其元数据进行增删改(然后再通过SecondaryNameNode对元数据进行修改)。注意,此时只是把更新操作记录到编辑日志Editlog,并没有真正合并编辑日志和镜像文件,只有触发checkPoint才合并。

9. 机架感知(副本存储节点选择策略)

Hadoop3.1.3官方说明:http://hadoop.apache.org/docs/r3.1.3/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html#Data_Replication

For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on the local machine if the writer is on a datanode, otherwise on a random datanode, another replica on a node in a different (remote) rack, and the last on a different node in the same remote rack. This policy cuts the inter-rack write traffic which generally improves write performance. The chance of rack failure is far less than that of node failure; this policy does not impact data reliability and availability guarantees. However, it does reduce the aggregate network bandwidth used when reading data since a block is placed in only two unique racks rather than three. With this policy, the replicas of a file do not evenly distribute across the racks. One third of replicas are on one node, two thirds of replicas are on one rack, and the other third are evenly distributed across the remaining racks. This policy improves write performance without compromising data reliability or read performance.

默认情况下,HDFS中的数据块有3个副本。副本存储策略如下(看源码):

  • 第一个副本在Client所处的节点上。如果客户端在集群外,随机选一个。
  • 第二个副本在另一个机架的随机一个节点
  • 第三个副本在第二个副本所在机架的随机节点
  • 第四个及以上,随机选择副本存放位置。

这么选的好处:

  • 第一个副本在本地,考虑到结点距离最近,上传速度最快
  • 第二个副本在另一个机架的节点上,考虑到数据的可靠性
  • 第三个副本在第二个副本所在机架的节点,又要兼顾对应的效率

副本距离计算公式

两个节点到达最近的共同祖先的距离总和(其实就是数路径个数)。

例如,假设有数据中心 d1 机架 r1 中的节点 n1。该节点可以表示为/d1/r1/n1。利用这种标记,下图给出四种距离描述。

  • Distance(/d1/r1/n0,/d1/r1/n0)=0 :同一台服务器的距离为0
  • Distance(/d1/r1/n1,/d1/r1/n2)=2 :同一机架不同的服务器距离为2
  • Distance(/d1/r2/n0,/d1/r3/n2)=4 :不同机架的服务器距离为4

10. HDFS的读数据流程

(1)HDFS client创建DistributedFileSystem 对象,通过该对象向 NameNode 请求下载文件,NameNode 通过查询元数据,找到文件块所在的 DataNode 地址。
(2)挑选一台 DataNode(要考虑结点距离最近选择原则、DataNode负载均衡)服务器,请求读取数据。
(3)DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 Packet 为单位来做校验),串行读取,即先读取第一个块,再读取第二个块拼接到上一个块后面。
(4)客户端以 Packet 为单位接收,先在本地缓存,然后写入目标文件。

11. NN 和 2NN 工作机制(元数据持久化机制)

(一)第一阶段:NameNode 启动
(1)第一次启动 NameNode 格式化后,创建 镜像文件fsimage 和 编辑日志edits_inprogress_001 文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
(2)客户端对元数据进行增删改的请求。
(3)NameNode 记录更新操作到edits_inprogress_001 中
(4)NameNode 在内存中对元数据进行增删改(然后再通过SecondaryNameNode对元数据进行修改)。

(二)第二阶段:Secondary NameNode 工作
(1)Secondary NameNode 询问 NameNode 是否需要 CheckPoint(即是否需要服务),带回 NameNode是否可服务的条件。CheckPoint触发条件:定时时间到;Edits中的数据满了。
(2)Secondary NameNode 请求执行 CheckPoint(即请求服务)。
(3)NameNode 滚动正在写的 edits_inprogress_001 日志,将其命名为edits_001,并生产新的日志文件edits_inprogress_002,以后再有客户端操作,日志将记录到edits_inprogress_002中。
(4)将编辑日志edits_001和镜像文件fsimage拷贝到 SecondaryNameNode。
(5)Secondary NameNode 加载编辑日志和镜像文件到内存,并合并。
(6)生成新的镜像文件 fsimage.chkpoint。
(7)拷贝 fsimage.chkpoint 到 NameNode。
(8)NameNode 将 fsimage.chkpoint 重新命名成 fsimage(此时的fsimage是最新的)。

NN存储的是edits_inprogress,2NN存储的是edits。

12. Fsimage 和 Edits中的内容

NameNode被格式化之后,将在/opt/module/hadoop-3.1.3/data/dfs/name/current目录中产生如下文件

(1)FsImage文件包含文件系统中所有目录和文件inode的序列化形式。每个inode是一个文件或目录的元数据的内部表示,包含的信息有:文件或目录的创建、修改和访问时间、访问权限、块大小以及组成文件的块。

FsImage文件没有记录块存储在哪个数据节点。而是由NameNode把这些映射保留在内存中,当DataNode加入HDFS集群时,DataNode会把自己所包含的块列表告知给NameNode,此后会定期执行这种告知操作,以确保NameNode的块映射是最新的。

(2)Edits文件:存放HDFS文件系统的所有更新的操作,文件系统客户端执行的所有写操作首先会被记录到Edits文件中。
(3)seen_txid文件保存的是一个数字,就是最后一个edits_的数字

思考:NameNode 如何确定下次开机启动的时候合并哪些 Edits?
可以根据seen_txid文件保存的是一个数字,seen_txid保存的就是最后一个edits_的数字,是最新的edits。

13. NameNode与SecondaryNameNode 的区别与联系

区别

(1)NameNode负责管理整个文件系统的元数据,所有的文件路径和数据块的存储信息都保存在NameNode(其实就是文件系统中所有目录和文件inode的序列化形式),除此之外,NameNode处理客户端读写请求,分发给DataNode去执行。

(2)SecondaryNameNode:并非NameNode的热备。SecondaryNameNode定期触发CheckPoint(服务),代替NameNode合并编辑日志EditLog和镜像文件Fsimage,从而减小EditLog的大小,减少NN启动时间。同时在合并期间,NameNode也可以对外提供写操作。而SecondaryNameNode至始至终不能对外提供写操作。

联系

(1)SecondaryNameNode中保存了一份和namenode一致的历史镜像文件(fsimage)和历史编辑日志(edits)。但是, NameNode还有一份正在使用的编辑日志edit_inporgress,这是SecondaryNameNode没有的

(2)在主namenode发生故障时(假设没有及时备份数据),可以从SecondaryNameNode恢复历史的数据。

14. NameNode运行期间Editlog不断变大的问题

Hadoop1版本中,在NameNode运行期间,HDFS的所有更新操作都是直接写到EditLog中,久而久之, EditLog文件将会变得很大。

虽然这对NameNode运行时候是没有什么明显影响的,但是,当名称节点重启的时候,名称节点需要先将FsImage里面的所有内容映像到内存中,然后再一条一条地执行EditLog中的记录,当EditLog文件非常大的时候,会导致名称节点启动操作非常慢,而在这段时间内HDFS系统处于安全模式(即合并编辑日志EditLog和镜像文件Fsimage期间不能对外提供服务),一直无法对外提供写操作,影响了用户的使用。

如何解决?答案是:SecondaryNameNode第二名称节点

SecondaryNameNode定期触发CheckPoint,代表NameNode合并编辑日志EditLog和镜像文件Fsimage,从而减小EditLog的大小,减少NN启动时间。同时在合并期间,NameNode也可以对外提供写操作。

15. SecondaryNameNode的目的是什么

SecondaryNameNode定期触发CheckPoint,代表NameNode合并编辑日志EditLog和镜像文件Fsimage,从而减小EditLog的大小,减少NN启动时间。同时在合并期间,NameNode也可以对外提供写操作。

16. 请简述DataNode工作机制

(1)一个数据块在 DataNode 上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据的校验信息包括数据块的长度,块数据的校验和,以及时间戳
(2) DataNode 启动后向 NameNode 注册,之后周期性(默认 6 小时)的向 NameNode 上报所有的块信息。同时,DN 扫描自己节点块信息列表的时间,检查DN中的块是否完好,如果某块磁盘损坏,就将该块磁盘上存储的所有 BlockID报告给NameNode。

(3)心跳是每 3 秒一次,心跳返回结果带有 NameNode 给该 DataNode 的命令如复制块数据到另一台机器,或删除某个数据块。 如果超过 10 分钟 + 30s 没有收到某个 DataNode 的心跳,则认为该节点不可用。

17. DataNode掉线时长是多少

18. HDFS可靠性机制(健壮性、容错性、数据完整性机制)

HDFS主要目的是保证存储数据完整性,对于各组件的失效,做了可靠性处理。

(1)数据存储故障容错

磁盘介质在存储过程中受环境或者老化影响,其存储的数据可能会出现错乱。HDFS的应对措施是,对于存储在DataNode上的数据块,计算并存储校验和(CheckSum)。在读取数据的时候,重新计算读取出来的数据的校验和,如果校验不正确就抛出异常,应用程序捕获异常后就到其他DataNode 上读取备份数据。

(2)磁盘故障容错

如果DataNode监测到本机的某块磁盘损坏,就将该块磁盘上存储的所有 BlockID报告给NameNode,NameNode检查这些数据块还在哪些DataNode上有备份,通知相应的DataNode服务器将对应的数据块复制到其他服务器上,以保证数据块的备份数满足要求。

(3)DataNode 故障容错

DataNode会通过心跳和NameNode保持通信,如果DataNode超时未发送心跳,并超过 10 分钟 + 30s 没有收到某个 DataNode 的心跳,则NameNode认为该节点不可用,立即查找这个 DataNode上存储的数据块有哪些,以及这些数据块还存储在哪些服务器上,随后通知这些服务器再复制一份数据块到其他服务器上,保证HDFS存储的数据块备份数符合用户设置的数目,即使再出现服务器宕机,也不会丢失数据。

‘(4)NameNode故障容错

NameNode负责管理整个文件系统的元数据,所有的文件路径和数据块的存储信息都保存在NameNode(其实就是文件系统中所有目录和文件inode的序列化形式),除此之外,NameNode处理客户端读写请求,分发给DataNode去执行。如果NameNode故障,整个 HDFS系统集群都无法使用;如果 NameNode上记录的数据丢失,整个集群所有DataNode存储的数据也就没用了。

所以,NameNode高可用容错能力非常重要。NameNode可以采用HDFS NameNode 的高可用机制,具体见Hadoop高可用那一题。

总结:正是因为HDFS的这些策略,才保证存储数据完整性,为运行于Hadoop之上的应用,提供稳固的支持。

19. Hadoop集群的安全模式

安全模式:文件系统只接受读数据请求,而不接受删除、修改等变更请求

进入安全模式场景

  • NameNode 在启动时加载镜像文件和编辑日志期间处于安全模式
  • NameNode 在接收 DataNode 注册时,处于安全模式【因为NN要知道DN都有哪些块才能对外提供服务】

退出安全模式有三个条件

条件一:集群上最小可用的datanode 数量大于0

dfs.namenode.safemode.min.datanodes:最小可用 datanode 数量,默认 0。

条件二:系统中99.99%的数据块都可用了,即系统中只允许丢一个块

dfs.namenode.safemode.threshold-pct:副本数达到最小要求的 block 占系统总 block 数的
百分比,默认 0.999f。(只允许丢一个块)

条件三:集群在启动过后得过了30s之后才能退出安全模式

dfs.namenode.safemode.extension:稳定时间,默认值 30000 毫秒,即 30 秒

20. HDFS存储优化之纠删码原理

HDFS 默认情况下,一个文件有 3 个副本,这样提高了数据的可靠性,但也多带来了 2 倍的冗余开销。 Hadoop3.x 引入了纠删码, 采用计算的方式, 可以节省约 50%左右的存储空间。

RS-3-2-1024k:使用 RS 编码把一个文件拆成 3 个数据单元,生成 2 个校验单元,共 5 个单元,也就是说:这 5 个单元中,只要有任意的 3 个单元存在(不管是数据单元还是校验单元,只要总数=3),就可以得到原始数据。每个单元的大小是 1024k=1024*1024=1048576。

这个的每个单元的大小是 1024k是指最小单元为1M,如果一个文件大小为300M,则这个文件会分成300个数据单元,然后采用3-2策略,即每100个最小单元组成1个数据单元,这样就可以组成3个数据单元。这样总共900M的存储开销降到了500M。

RS-10-4-1024k:使用 RS 编码,每 10 个数据单元(cell),生成 4 个校验单元,共 14个单元,也就是说:这 14 个单元中,只要有任意的 10 个单元存在 (不管是数据单元还是校验单元,只要总数=10),就可以得到原始数据。每个单元的大小是 1024k=1024*1024=1048576。

RS-6-3-1024k:使用 RS 编码,每 6 个数据单元,生成 3 个校验单元,共 9 个单元,也就是说:这 9 个单元中,只要有任意的 6 个单元存在(不管是数据单元还是校验单元,只要总数=6),就可以得到原始数据。每个单元的大小是 1024k=1024*1024=1048576。

RS-LEGACY -6-3-1024k:策略和上面的 RS-6-3-1024k 一样,只是编码的算法用的是 rslegacy。 XOR-2-11024k:使用 XOR 编码(速度比 RS 编码快),每 2 个数据单元,生成 1 个校验单元,共 3 个单元,也就是说:这 3 个单元中,只要有任意的 2 个单元存在(不管是数据单元还是校验单元,只要总数 = 2 ),就可以得到原始数据。每个单元的大小是1024k=1024*1024=1048576。

21. HDFS存储优化之异构存储(冷热数据分离)

异构存储主要解决,不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。

存储类型

RAM_DISK:(内存镜像文件系统)
SSD:(SSD固态硬盘)
DISK:(普通磁盘,在HDFS中,如果没有主动声明数据目录存储类型默认都是DISK)
ARCHIVE:(没有特指哪种存储介质,主要的指的是计算能力比较弱而存储密度比较高的存储介质,用来解决数据量的容量扩增的问题,一般用于归档)

存储策略

说明:从Lazy_Persist到Cold,分别代表了设备的访问速度从快到慢

22. 简要介绍一下MapReduce

MapReduce 是一个分布式运算程序的编程框架,它的核心功能是将用户编写的业务逻辑代码自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。

优点

1)MapReduce 易于编程
它简单的实现一些接口,就可以完成一个分布式程序, 这个分布式程序可以分布到大量廉价的 PC 机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得 MapReduce 编程变得非常流行。

2)良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

3)高容错性
MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成的。

4)适合 TB/PB 级以上海量数据的离线处理
可以实现上千台服务器集群并发工作,提供数据处理能力。

缺点

1)不擅长实时计算

MapReduce 无法像 mysql 一样,在毫秒或者秒级内返回结果。

2)不擅长流式计算
流式计算的输入数据是动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。这是因为 MapReduce自身的设计特点决定了数据源必须是静态的。

3)不擅长 DAG(有向无环图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce 并不是不能做,而是使用后, 每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘 IO,导致性能非常的低下。

23. MapReduce 进程有哪些

整体框架:

一个完整的 MapReduce 程序在分布式运行时有三类实例进程:

(1)MrAppMaster:负责整个程序 (一个Job或Task或Mr)的过程调度及状态协调。【查到的进程就是它自己】

(2)MapTask:负责 Map 阶段的整个数据处理流程。【查到的进程是YarnChild】

(3)ReduceTask:负责 Reduce 阶段的整个数据处理流程。 【查到的进程是YarnChild】

24. MapReduce之Hadoop的序列化

(1)什么是序列化

序列化就是把内存中的对象,转换成字节序列 (或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。

反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

(2)为什么不用 Java 的序列化

Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息, Header,继承体系等),不便于在网络中高效传输。所以,Hadoop 自己开发了一套序列化机制(Writable)。

(3)Hadoop 序列化特点:

  • 紧凑 :高效使用存储空间。
  • 快速:读写数据的额外开销小。
  • 互操作:支持多语言的交互

(4)如何实现hadoop的序列化

具体实现 bean 对象序列化步骤如下 7 步。

  1. 必须实现 Writable 接口
  2. 必须有空参构造。(反序列化时,需要反射调用空参构造函数,所以必须有空参构造。如果没有任何构造函数,可以不写,如果有了带参的构造函数,必须加上空参构造函数)。
  3. 重写序列化write方法
  4. 重写反序列化readFields方法
  5. 注意反序列化的顺序和序列化的顺序完全一致
  6. 要想把结果显示在文件中,需要重写 toString(),可用"\\t"分开,方便后续用(注意默认传输过来的是地址值)。
  7. 如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口,重写compareTo方法,因为MapReduce 框中的 Shuffle 过程要求对 key 必须能排序。【也可直接实现WritableComparable】

25. 切片与 MapTask 并行度决定机制

(1)问题引入
MapTask 的并行度决定 Map 阶段的任务处理并发度,进而影响到整个 Job 的处理速度。

MapTask并不是也多越好,也不是越好越好。太少,并行能力较弱,会导致task等待,延长处理时间;太多,可能会导致任务启动的时间大于任务本身处理的时间,会得不偿失,并且会造成很多资源的浪费,比如1M的文件开启10个MapTask就没必要,而1G文件开启10个MapTask就很有必要了。

(2)MapTask 并行度决定机制

  • 数据块:Block 是 HDFS 物理上把数据分成一块一块(默认是128M)。数据块是 HDFS 存储数据单位。
  • 数据切片: 数据切片只是在逻辑上对输入进行分片, 并不会在磁盘上将其切分成片进行存储。 数据切片是 MapReduce 程序计算输入数据的单位,一个切片会对应启动一个 MapTask。

数据切片与MapTask并行度决定机制

26. MapReduce的Job(或Task) 提交流程源码

Job提交流程源码详解

waitForCompletion();

submit();

// 一、建立连接
	connect();
		// 创建提交 Job 的代理
		new Cluster(getConfiguration());
			// 判断是本地运行环境还是 yarn 集群运行环境
			initialize(jobTrackAddr, conf);

// 二、提交 job
submitter.submitJobInternal(Job.this, cluster)
	// (1)创建给集群提交数据的 Stag 路径
	Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
	// (2)获取 jobid  ,并创建 Job 路径
	JobID jobId = submitClient.getNewJobID();
	// (3)拷贝 jar 包到集群
	copyAndConfigureFiles(job, submitJobDir);
	rUploader.uploadFiles(job, jobSubmitDir);
	// (4)计算切片,生成切片规划文件
	writeSplits(job, submitJobDir);
	maps = writeNewSplits(job, jobSubmitDir);
	input.getSplits(job);
	// (5)向 Stag 路径写 XML 配置文件
	writeConf(conf, submitJobFile);
	conf.writeXml(out);
	// (6)提交 Job,返回提交状态
	status  =  submitClient.submitJob(jobId,  submitJobDir.toString(),job.getCredentials());

总结:

  1. 建立连接,创建提交 Job 的集群代理
  2. 创建给集群提交数据的 Stag 路径,如果是本地运行环境使用file协议,如果是yarn集群运行环境,则使用HDFS协议
  3. 获取 jobid,将jobid和Stag路径拼接起来,用于该任务的提交路径
  4. 调用FileInputFormat.getSplits()切片规划,并序列化成Job.split
  5. 将Job相关参数写到文件Job.xml
  6. 如果是yarn集群运行环境,还需要拷贝拷贝 jar 包到集群

27. FileInputFormat切片(InputSplit)流程源码

FileInputFormat切片源码解析(input.getSplits(job)即InputSplit)【这是hadoop3.1.3的源码】

(1)程序先找到输入数据存储的目录。
(2)开始遍历处理目录下的每一个文件
(3)按照每一个文件单独切片,具体如下:

(a)获取文件大小fs.sizeOf(ss.txt)
(b)计算切片大小`computeSplitSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M`
(c)默认情况下,切片大小=blocksize,如果增大切片大小,则将minSize设置大于128M,如果要减小切片大小,则将maxSize设置小于128M
(d)开始切,形成第1个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,大于1.1倍就划分一块切片,小于1.1倍就不再切了)
(e)将切片信息写到一个切片规划Job.split文件中
(f)整个切片的核心过程在getSplit()方法中完成
(g)InputSplit只是在物理上记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等。

(4)提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数。

注意:Math.max(minSize, Math.min(maxSize, blockSize));

mapreduce.input.fileinputformat.split.minsize=1 默认值为1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue
因此,默认情况下,切片大小=blocksize。

28. FileInputFormat的实现类有哪些

思考:在运行 MapReduce 程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。 那么,针对不同的数据类型, MapReduce 是如何读取这些数据的呢?

FileInputFormat 常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、CombineTextInputFormat、NLineInputFormat和自定义 InputFormat 等。

(1)TextInputFormat

TextInputFormat 是默认的 FileInputFormat 实现类。按行读取每条记录。 键是存储该行在整个文件中的起始字节偏移量, LongWritable 类型。值是这行的内容,不包括任何行终止符(换行符和回车符),是Text 类型。

(2)KeyValueTextInputFormat

CombineTextInputFormat 用于将多个小文件在切片过程中生成一个单独的切片或者少量的切片,以减少切片的数量。

每行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\\t");来设定分隔符。默认分隔符是tab(\\t)。分隔符前面的内容是key,分隔符后面的内容是value(Key和value的类型由用户定义)。

(3)NLineInputFormat

如果使用NLineInputFormat指定的行数N来划分切片。即输入文件的总行数 除以 N =切片数,如果不整除,切片数=商+1。

(4)CombineTextInputFormat

框架默认的 TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个 MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。

(4.1)应用场景:
CombineTextInputFormat 用于小文件过多的场景, 它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个 MapTask 处理。

(4.2)虚拟存储切片最大值设置

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。

(4.3)切片机制
生成切片过程包括:虚拟存储过程和切片过程二部分。 举个例子来说明:
假如有四个文件,每个文件大小如下:

a.txt 1.7M
b.txt 5.1M
c.txt 3.4M
d.txt 6.8M

处理过程如下图所示

第一步:虚拟存储过程

将输入目录下所有文件大小,依次和设置的 setMaxInputSplitSize 值比较,如果不大于设置的最大值,逻辑上划分一个虚拟块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块; 当剩余数据大小超过设置的最大值且不大于最大值 2 倍,此时将文件均分成 2 个虚拟存储块(防止出现太小切片),最终形成多个虚拟存储的文件。

例如 setMaxInputSplitSize 值为 4M,输入文件大小为 8.02M,则先逻辑上分成一个4M。剩余的大小为 4.02M,如果按照 4M 逻辑划分,就会出现 0.02M 的小的虚拟存储文件,所以将剩余的 4.02M 文件切分成(2.01M 和 2.01M)两个文件。

第二步:切片过程

(a)判断虚拟存储的文件大小是否大于 setMaxInputSplitSize 值,大于等于则单独形成一个切片。
(b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片,依次类推,直到产生大于4M的切片为止。

(c) 测试举例:有 4 个小文件大小分别为 1.7M、 5.1M、 3.4M 以及 6.8M 这四个小文件,则虚拟存储之后形成 6 个文件块,大小分别为:

1.7M,(2.55M、2.55M),3.4M 以及(3.4M、3.4M)

最终会形成 3 个切片,大小分别为:

(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

29. MapReduce 工作流程(重点)

整体框架:

MapReduce详细工作流程

参考:https://mp.weixin.qq.com/s/uAySEG98urCgwD0uB6kimg

假如有一个200M的待处理文件,具体工作流程如下:

  1. 切片:在客户端提交之前,根据参数配置,进行任务规划,将文件按128M每块进行切片。(InputSplit的切片流程,具体看27题)

  2. 提交:提交可以提交到本地工作环境或者Yarn工作环境,本地只需要提交切片规划Job.split和Job相关参数文件Job.xml,Yarn环境还需要提交jar包;本地环境一般只作为测试用。(Job提交流程,具体看26题)

  3. 将job提交到yarn上(或本地环境后),YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数(详细见后边的Yarn工作流程介绍)。

  4. MapTask中执行Mapper的map方法,此方法需要k和v作为输入参数,所以会首先获取kv值;

    • 首先调用InputFormat方法,默认为TextInputFormat方法,在此方法调用createRecoderReader方法,一行1一行读取数据封装为k,v键值对,传递给map方法
  5. map方法进行一系列用户写的逻辑操作,到这里map阶段其实已经完成了,下面进入Shuffer阶段

  6. map处理完成相关的逻辑操作之后,会产生一系列新的key/value,首先通过OutputCollector.collect()向环形缓冲区写入数据,环形缓冲区主要两部分,一部分写入数据的索引信息,另一部分写入数据的内容。环形缓冲区的默认大小是100M,当缓冲的容量达到默认大小的80%时,开启一个新的线程进行反向溢写,之所以反向溢写是因为这样就可以边接收数据边往磁盘溢写数据。

  7. 在环形缓冲区溢写到文件之前,会将缓冲区中的的数据(调用Partitioner)进行分区,并针对Key的索引按照字典顺序进行快速排序。

  8. 在分区和排序之后,溢写到磁盘,可能发生多次溢写,溢写到多个文件。

  9. 对所有溢写到磁盘的文件进行归并排序。

  10. 在9到10步之间还可以有一个Combine合并操作,意义是对每个MapTask的输出进行局部汇总,以减少网络传输量:

    • Map阶段的进程数一般比Reduce阶段要多,所以放在Map阶段处理效率更高

    • Map阶段合并之后,传递给Reduce的数据就会少很多

    • 但是Combiner能够应用的前提是不能影响最终的业务逻辑,而且Combiner的输出kv要和Reduce的输入kv类型对应起来。
      例如,我们要求平均值,就不符合要求。但是,如果我们要求和,那么使用Combiner后,不影响最终结果。

  11. 另外,在第10步,还可以进行压缩(主要是用snappy和lzo,主要是快),以减少网络传输量。下面是Reduce阶段

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

  1. MapTask结束后,启动相应数量的ReduceTask(和分区数量相同)
  2. ReduceTask从每个MapTask上远程拷贝(拉取)相应的数据文件,如果文件大小超过一定阈值(即超过ReduceTask进程内存缓冲区的大小),则溢写磁盘上,否则存储在内存中。(如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上)。当所有数据拷贝完毕后,ReduceTask 统一对内存和磁盘上的所有数据进行一次归并排序
  3. 最后将数据传给reduce进行处理,一次读取一组数据
  4. 最后通过OutputFormat输出,默认为TextOutputFormat方法,在此方法调用createRecoderWrite将结果写到HDFS中。

30. Partitioner分区与ReduceTask 并行度决定机制

MapTask 并行度由切片个数决定,切片个数由输入文件和切片规则决定。ReduceTask 的并行度同样影响整个 Job 的执行并发度和执行效率,但与 MapTask 的并发数由切片数决定不同,ReduceTask 数量的决定是可以直接手动设置,一般与getPartition的结果数相同

(1)问题引出

要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

(2)默认Partitioner分区

java

public class HashPartitioner<K, V> extends Partitioner<K, V> {
	public int getPartition(K key, V value, int numReduceTasks) {
		return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
	}
}

默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。

(3)自定义Partitioner步骤

① 自定义类继承Partitioner,重写getPartition()方法



public class CustomPar

以上是关于五万字,57道hadoop大厂高频面试题,每一字都细心打磨,强烈建议收藏!的主要内容,如果未能解决你的问题,请参考以下文章

2021年10月大厂高频核心前端面试题总结,五万多字,面试必考

五万字 | 耗时一个月,整理出这份Hadoop吐血宝典

五万字 | 耗时一个月,整理出这份Hadoop吐血宝典

80 道大厂算法高频面试题

含泪刷128道面试题,50万字2022最新Android11位大厂面试专题

五万字长文:Java面试知识点总结