Hadoop相关
Posted ll-yao-blog
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop相关相关的知识,希望对你有一定的参考价值。
Hadoop
- 在Hadoop1.x时代,Hadoop中的MapReduce同时处理业务逻辑运算和资源的调度,耦合性较大,在Hadoop2.x时代,增加了Yarn。Yam只负责资源的调度,MapReduce只负责运算。
- Hadoop运行模式
本地模式、伪模式以及完全分布式模式。
3.格式化
只能格式化一次NameNode。因为格式化NameNode,会产生新的集群id,导致NameNode和DataNode的集群id不一致,集群找不到已往数据。所以,格式NameNode时,一定要先删除data数据和log日志,然后再格式化NameNode。
Hadoop - hdfs
1定义
HDFS是大数据开源框架hadoop的组件之一,全称(Hadoop Distributed File System),它是一个分布式文件系统,由多台服务器联合起来实现文件存储功能,通过目录树来定位文件,集群中的服务器都有有各自的角色。
HDFS不支持存储小文件,HDFS目前默认块大小在Hadoop2.x版本中是128M。
2 特点
- 数据通过副本存储,提高容错性
- 能够处理PB级及以上数据,可处理百万级文件数量
- 节约成本,可分布在“廉价”机器上
- 不适合低延时数据访问
- 不适合对大量文件的存储和访问
- 单线程操作文件不能多用户执行写操作,并且不支持文件随机修改
3 HDFS组成架构概述
1) NameNode(nn) : 存储文件的元数据:文件名、文件目录结构,文件属性(生成时间、副本数、文件权限) .以及每个文件的块列表和块所在的DataNode等。
1.负责接受客户端读写数据请求
2.负责数据块副本的存储策略
3.负责管理快数据的映射关系
4.储存元数据信息
2) DataNode (dn): 在本地文件系统存储文件块数据,以及块数据的校验。
1.存储实际的数据块
2.真实处理数据块的读/写操作
3) Secondary NameNode(2nn) : 用来监控HDFS状态的辅助后台程序,每隔一段时间获取HDFS元教据的快照。
4 Hdfs优缺点
优点:高容错性、适合处理大数据、
可构建在廉价机器上,通过多副本机制,提高可靠性
缺点:不适合低延时数据访问
无法高效的对大量小文件进行存储
不支持并发写入、文件随机修改。
5 bin/hadoop fs | bin/hdfs dfs 命令分类
本地文件 -> HDFS
-put 将本地数据上传至hdfs
-copyFromLocal 将本地文件数据拷贝到hdfs
-moveFromLocal 将本地文件数据移动到hdfs,成功后本地数据会删除
-appendToFile 追加一个文件到已经存在的文件末尾
HDFS与HDFS之间
-ls 查看hdfs文件目录
-mkdir 在HDFS上创建目录
-rm 删除文件或者文件夹
-rmr | -rm -r 递归删除
-cp 从一个目录拷贝文件至另一目录
-mv 在HDFS目录中移动文件
-chown 修改文件所属用户权限
-chmod 修改文件所属读写权限
-du -h 文件夹暂用的空间大小
-df -h 查看系统分区情况
-cat 查看文件
HFDS -> 本地
-get 从hdfs下载文件至本地
-getmerge 合并hdfs目录下的文件至本地
-copyToLocal 从hdfs拷贝文件至本地
其他
-setrep 设置文件副本数(注意:具体的副本数得看DataNode的数量)
-help 帮助命令
6 HDFS的数据流
(一) hdfs的文件上传的流程
1)客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在。
2)NameNode返回是否可以上传。
3)客户端请求第一个 Block上传到哪几个DataNode服务器上。
4)NameNode返回3个DataNode节点,分别为dn1、dn2、dn3。
5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。
6)dn1、dn2、dn3逐级应答客户端。
7)客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。
8)当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)。
—— 在HDFS写数据的过程中,NameNode会选择距离待上传数据最近距离的DataNode接收数据。
—— 副本存储节点选择
第一个副本在client所处的节点上。如果客户端在集群外,随机选一个。
第二个副本和第一个副本位于相同机架,随机节点。
第三个副本位于不同机架,随机节点。
(二) hdfs的文件下载的流程
1)客户端通过Distributed FileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。
2)挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。
3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。
4)客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。
七 NN和2NN工作机制
1. 第一阶段:NameNode启动
(1)第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
(2)客户端对元数据进行增删改的请求。
(3)NameNode记录操作日志,更新滚动日志。
(4)NameNode在内存中对元数据进行增删改。
2. 第二阶段:Secondary NameNode工作
通常情况下,SecondaryNameNode每隔一小时执行一次。
(1)Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。
(2)Secondary NameNode请求执行CheckPoint。
(3)NameNode滚动正在写的Edits日志。
(4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。
(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。
(6)生成新的镜像文件fsimage.chkpoint。
(7)拷贝fsimage.chkpoint到NameNode。
(8)NameNode将fsimage.chkpoint重新命名成fsimage。
八 NameNode故障处理
方法一:将SecondaryNameNode中数据拷贝到NameNode存储数据的目录;
1. kill -9 NameNode进程
2. 删除NameNode存储的数据(/opt/module/hadoop-2.7.2/data/tmp/dfs/name)
3. 拷贝SecondaryNameNode中数据到原NameNode存储数据目录。
4. 重新启动NameNode
方法二:使用-importCheckpoint选项启动NameNode守护进程,从而将SecondaryNameNode中数据拷贝到NameNode目录中。
- 修改hdfs-site.xml
2. kill -9 NameNode进程
3. 删除NameNode存储的数据(/opt/module/hadoop-2.7.2/data/tmp/dfs/name)
4. 如果SecondaryNameNode不和NameNode在一个主机节点上,需要将SecondaryNameNode存储数据的目录拷贝到NameNode存储数据的平级目录,并删除in_use.lock文件
5. 导入检查点数据(等待一会ctrl+c结束掉)
6. 启动NameNode
九 集群安全模式
集群处于安全模式,不能执行重要操作(写操作)。集群启动完成后,自动退出安全模式。
(1)bin/hdfs dfsadmin -safemode get (功能描述:查看安全模式状态)
(2)bin/hdfs dfsadmin -safemode enter (功能描述:进入安全模式状态)
(3)bin/hdfs dfsadmin -safemode leave (功能描述:离开安全模式状态)
(4)bin/hdfs dfsadmin -safemode wait (功能描述:等待安全模式状态)
十 DataNode工作机制
1)一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。
2)DataNode启动后向NameNode注册,通过后,周期性(1小时)的向NameNode上报所有的块信息。
3)心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。
4)集群运行中可以安全加入和退出一些机器。
十一 节点
(一)服役新数据节点
1. 环境准备
(1)在hadoop104主机上再克隆一台hadoop105主机
(2)修改IP地址和主机名称
(3)删除原来HDFS文件系统留存的文件(/opt/module/hadoop-2.7.2/data和log)
(4)source一下配置文件
2. 服役新节点具体步骤
(1)直接启动DataNode,即可关联到集群
(2)在hadoop105上上传文件
(3)如果数据不均衡,可以用命令实现集群的再平衡
(二)退役旧数据节点
1. 添加白名单。
添加到白名单的主机节点,都允许访问NameNode,
不在白名单的主机节点,都会被退出。
2.黑名单退役
在黑名单上面的主机都会被强制退出。
注意:不允许白名单和黑名单中同时出现同一个主机名称。
十二 小文件存档
1. HDFS存储小文件弊端
每个文件均按块存储,每个块的元数据存储在NameNode的内存中,因此HDFS存储小文件会非常低效。因为大量的小文件会耗尽NameNode中的大部分内存。但注意,存储小文件所需要的磁盘容量和数据块的大小无关。例如,一个1MB的文件设置为128MB的块存储,实际使用的是1MB的磁盘空间,而不是128MB。
- 解决存储小文件办法之一
HDF S存档文件或HAR文件,是一个更高效的文件存档工具,它将文件存入HDFS块,在减少NameNode内存使用的同时,允许对文件进行透明的访问。具体说来,HDFS存档文件对内还是一个一个独立文件,对NameNode而言却是一个整体,减少了NameNode的内存。
十三 HDFS HA高可用
实现高可用最关键的策略是消除单点故障。
HA严格来说应该分成各个组件的HA机制:HDFS的HA和YARN的HA。
HDFS-HA工作机制:通过双NameNode消除单点故障
Hadoop - yarn
一. YARN架构概述
1. 定义
Yarn是一个分布式资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而MapReduce等运算程序则相当于运行于操作系统之上的应用程序。
2.yarn基本架构
1.ResourceManager:处理客户端请求,监控nodeManager,
启动或监控ApplicationMaster,资源调度分配和调度
2.NameNode: 管理单个节点上的资源,处理来自rm和app的命令
3.ApplicationMaster:数据切分,为应用程序申请资源并分配给内部的任务,
任务的监控和容错
4.container: 封装节点上的多维度资源
二. yarn作业提交过程
(1)作业提交
第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。
第2步:Client向RM申请一个作业id。
第3步:RM给Client返回该job资源的提交路径和作业id。
第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。
第5步:Client提交完资源后,向RM申请运行MrAppMaster。
(2)作业初始化
第6步:当RM收到Client的请求后,将该job添加到容量调度器中。
第7步:某一个空闲的NM领取到该Job。
第8步:该NM创建Container,并产生MRAppmaster。
第9步:下载Client提交的资源到本地。
(3)任务分配
第10步:MrAppMaster向RM申请运行多个MapTask任务资源。
第11步:RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(4)任务运行
第12步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
第13步:MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
第14步:ReduceTask向MapTask获取相应分区的数据。
第15步:程序运行完毕后,MR会向RM申请注销自己。
(5)进度和状态更新
YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。
(6)作业完成
除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。
三 资源调度器
目前,Hadoop作业调度器主要有三种:FIFO、Capacity Scheduler和Fair Scheduler。Hadoop2.7.2默认的资源调度器是Capacity Scheduler。
四 任务的推测执行
1.作业完成时间取决于最慢的任务完成时间
2.推测执行机制
发现拖后腿的任务,比如某个任务运行速度远慢于任务平均速度。为拖后腿任务启动一个备份任务,同时运行。谁先运行完,则采用谁的结果。
3.执行推测任务的前提条件
(1)每个Task只能有一个备份任务
(2)当前Job已完成的Task必须不小于0.05(5%)
(3)开启推测执行参数设置。mapred-site.xml文件中默认是打开的。
4.不能启用推测执行机制情况
(1)任务间存在严重的负载倾斜;
(2)特殊任务,比如任务向数据库中写数据。
5.算法原理
(1)MR总是选择(推测执行完时刻 - 备份任务推测完成时刻)差值最大的任务,并为之启动备份任务。
(2)为了防止大量任务推测执行启动备份任务造成的资源浪费,MR为每个作业设置了同时启动的备份任务数目上限。
(3)采用以空间换时间的策略。注意:在集群资源紧缺的情况下,应合理使用该机制,在用少量资源的情况下,减少作业的计算时间。
Hadoop - MapReduce
1 MapReduce概述
(一)MapReduce定义
MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
(二)MapReduce架构概述
MapReduce将计算过程分为两个阶段:Map和Reduce。
1)Map阶段并行处理输入数据
2)Reduce阶段对Map结果进行汇总
2. MapReduce优缺点
优点:易于编程、良好的扩展性、高容错性、适合处理大数据。
缺点:不擅长实时计算、不擅长流式计算、不擅长有向图计算。
3.核心编程思想
1)分布式的运算程序往往需要分成至少2个阶段。
2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。
3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
总结:分析WordCount数据流走向深入理解MapReduce核心思想。
4.mr运行过慢的原因有哪些?
- 计算机性能瓶颈,硬件配置低,服务器性能跟不上。
- 出现数据倾斜、Map和Reduce数设置不合理、处理大量小文件、大量的不可分块的超大文件、、Spill溢写次数过多、合并文件次数过多
5.MapReduce编程规范
用户编写的程序分成三个部分:Mapper、Reducer和Driver。
1. Mapper阶段
(1)用户自定义的Mapper要继承自己的父类
(2) Mapper的输入数据是KV对的形式(KV的类型可自定义)
(3) Mapper中的业务逻辑写在map0方法中
(4) Mapper的输出数据是KV对的形式(KV的类型可自定义)
(5) map0方法 (MapTask进程) 对每一个<K,V>调用一次
2. Reducer阶段
(1)用户自定义的Reducer要继承自己的父类
(2) Reducer的输 入数据类型对应Mapper的输出数据类型,也是KV
(3) Reducer的业务逻辑写在reduce()方法中
(4) ReduceTask进程对每一组相同k:的<k;>组调用- -次reduce0方法
3. Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象
2.5.常用数据序列化类型
Java类型 Hadoop Writable类型
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable
2.6.序列化概述
2.6.1. 什么是序列化
l序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
l反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
2.6.2.为什么要序列化
l一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
2.6.3.为什么不用Java的序列化
lJava的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。
2.6.4.Hadoop序列化特点:
l紧凑 :高效使用存储空间。
l快速:读写数据的额外开销小。
l可扩展:随着通信协议的升级而可升级
l互操作:支持多语言的交互
2.7.FileInputFormat切片机制
2.7.1.切片机制
l简单地按照文件的内容长度进行切片
l切片大小,默认等于Block大小
l切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
2.7.2.FileInputFormat切片大小的参数配置
2.7.2.1.源码中计算切片大小的公式
lMath.max(minSize, Math.min(maxSize, blockSize));
lmapreduce.input.fileinputformat.split.minsize=1 默认值为1
lmapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue
l因此,默认情况下,切片大小=blocksize。
2.7.2.2.切片大小设置
lmaxsize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数的值。
lminsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blockSize还大。
2.7.2.3.获取切片信息API
l// 获取切片的文件名称
lString name = inputSplit.getPath().getName();
l// 根据文件类型获取切片信息
lFileSplit inputSplit=(FileSplit)context.getInputSplit();
2.8. CombineTextInputFormat切片机制
l框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
2.8.1.应用场景:
lCombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
2.8.2.虚拟存储切片最大值设置
lCombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
l注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
2.8.3.切片机制
2.8.3.1.生成切片过程包括:虚拟存储过程和切片过程二部分。
2.8.3.1.1.虚拟存储过程:
l将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
2.8.3.2.切片过程:
l判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
l如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
l测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)最终会形成3个切片,大小分别为:(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M
2.8.4.Combiner合并
lCombiner是MR程序中Mapper和Reducer之外的一种组件。
lCombiner组件的父类就是Reducer。
lCombiner和Reducer的区别在于运行的位置
ØCombiner是在每一个MapTask所在的节点运行;
ØReducer是接收全局所有Mapper的输出结果;
lCombiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。
lCombiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。
Mapper
3 5 7 ->(3+5+7)/3=5
2 6 ->(2+6)/2=4 Reducer
(3+5+7+2+6)/5=23/5 不等于 (5+4)/2=9/2
2.9.MapTask工作机制
lRead阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
lMap阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
lCollect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
lSpill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
lCombine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
l当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。
l在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
l让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
2.10.溢写阶段详情
l步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
l步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
l步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
2.11.ReduceTask工作机制
lCopy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
lMerge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
lSort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
lReduce阶段:reduce()函数将计算结果写到HDFS上。
2.12.shuffle流程概述?
l key,value从map()方法输出后,被outputcollector收集通过HashPartitioner类的getpartitioner()方法获取分区号,进入环形缓冲区。默认情况下,环形缓冲区为100MB。当环形环形缓冲区存储达到80%,开始执行溢写过程,溢写过程中如果有其他数据进入,那么由剩余的20%反向写入。溢写过程会根据key,value,先根据分区进行排序,再根据key值进行排序,后生成一个溢写文件(再对应的分区下根据key值有序),maptask将溢写文件归并排序后落入本地磁盘,reduceTask阶段将多个mapTask下相同分区的数据copy到指定的reduceTask中进行归并排序、分组后一次读取一组数据给reduce()函数。
2.13.Join多种应用
2.13.1.Reduce Join工作原理
lMap端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
lReduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。
2.14.MR支持的压缩编码
压缩格式 hadoop自带? 算法 文件扩展名 是否可切分 换成压缩格式后,原来的程序是否需要修改
DEFLATE 是,直接使用 DEFLATE .deflate 否 和文本处理一样,不需要修改
Gzip 是,直接使用 DEFLATE .gz 否 和文本处理一样,不需要修改
bzip2 是,直接使用 bzip2 .bz2 是 和文本处理一样,不需要修改
LZO 否,需要安装 LZO .lzo 是 需要建索引,还需要指定输入格式
Snappy 否,需要安装 Snappy .snappy 否 和文本处理一样,不需要修改
2.15.MapReduce 跑的慢的原因
2.15.1.MapReduce 程序效率的瓶颈在于两点
l计算机性能
ØCPU、内存、磁盘健康、网络
lI/O 操作优化
Ø数据倾斜
ØMap和Reduce数设置不合理
ØMap运行时间太长,导致Reduce等待过久
Ø小文件过多
Ø大量的不可分块的超大文件
ØSpill次数过多
ØMerge次数过多等。
2.16.MapReduce优化方法
lMapReduce优化方法主要从六个方面考虑:
l数据输入、
lMap阶段、
lReduce阶段、
lIO传输、
l数据倾斜问题和常用的调优参数
2.16.1.数据输入
l合并小文件:在执行MR任务前将小文件进行合并,大量的小文件会产生大量的Map任务,增大Map任务装载次数,而任务的装载比较耗时,从而导致MR运行较慢。
l采用CombineTextInputFormat来作为输入,解决输入端大量小文件场景。
2.16.2.Map阶段
l减少溢写(Spill)次数:通过调整io.sort.mb及sort.spill.percent参数值,增大触发Spill的内存上限,减少Spill次数,从而减少磁盘IO。
l减少合并(Merge)次数:通过调整io.sort.factor参数,增大Merge的文件数目,减少Merge的次数,从而缩短MR处理时间。
l在Map之后,不影响业务逻辑前提下,先进行Combine处理,减少 I/O。
2.16.3.Reduce阶段
l合理设置Map和Reduce数:两个都不能设置太少,也不能设置太多。太少,会导致Task等待,延长处理时间;太多,会导致Map、Reduce任务间竞争资源,造成处理超时等错误。
l设置Map、Reduce共存:调整slowstart.completedmaps参数,使Map运行到一定程度后,Reduce也开始运行,减少Reduce的等待时间。
l规避使用Reduce:因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。
l合理设置Reduce端的Buffer:默认情况下,数据达到一个阈值的时候,Buffer中的数据就会写入磁盘,然后Reduce会从磁盘中获得所有的数据。也就是说,Buffer和Reduce是没有直接关联的,中间多次写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得Buffer中的一部分数据可以直接输送到Reduce,从而减少IO开销:mapreduce.reduce.input.buffer.percent,默认为0.0。当值大于0的时候,会保留指定比例的内存读Buffer中的数据直接拿给Reduce使用。这样一来,设置Buffer需要内存,读取数据需要内存,Reduce计算也要内存,所以要根据作业的运行情况进行调整。
2.16.4.I/O传输
l采用数据压缩的方式,减少网络IO的的时间。安装Snappy和LZO压缩编码器。
l使用SequenceFile二进制文件。
2.16.5.数据倾斜问题
2.16.5.1.数据倾斜现象
l数据频率倾斜——某一个区域的数据量要远远大于其他区域。
l数据大小倾斜——部分记录的大小远远大于平均值。
2.16.5.2.减少数据倾斜的方法
2.16.5.2.1.方法一:抽样和范围分区
l可以通过对原始数据进行抽样得到的结果集来预设分区边界值。
2.16.5.2.2.方法二:自定义分区
l基于输出键的背景知识进行自定义分区。例如,如果Map输出键的单词来源于一本书。且其中某几个专业词汇较多。那么就可以自定义分区将这这些专业词汇发送给固定的一部分Reduce实例。而将其他的都发送给剩余的Reduce实例。
2.16.5.2.3.方法三:Combine
l使用Combine可以大量地减小数据倾斜。在可能的情况下,Combine的目的就是聚合并精简数据。
l方法四:采用Map Join,尽量避免Reduce Join
2.17.小文存在的弊端(坏处)? 如何解决?
2.17.1.弊端
lHDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用NameNode的内存空间,另一方面就是索引文件过大使得索引速度变慢。
2.17.2.解决方案一
l在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS。
l在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并。
l在MapReduce处理时,可采用CombineTextInputFormat提高效率。
2.17.3.解决方案二
2.17.3.1.Hadoop Archive
l是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样就减少了NameNode的内存使用。
2.17.3.2.Sequence File
lSequence File由一系列的二进制key/value组成,如果key为文件名,value为文件内容,则可以将大批小文件合并成一个大文件。
2.17.3.3.CombineFileInputFormat
lCombineFileInputFormat是一种新的InputFormat,用于将多个文件合并成一个单独的Split,另外,它会考虑数据的存储位置。
2.17.4.解决方案三
2.17.4.1.开启JVM重用
l对于大量小文件Job,可以开启JVM重用会减少45%运行时间。
lJVM重用原理:一个Map运行在一个JVM上,开启重用的话,该Map在JVM上运行完毕后,JVM继续运行其他Map。
l具体设置:mapreduce.job.jvm.numtasks值在10-20之间。
3.Yarn
3.1.工作机制详解
lMR程序提交到客户端所在的节点。
lYarnRunner向ResourceManager申请一个Application。
lRM将该应用程序的资源路径返回给YarnRunner。
l该程序将运行所需资源提交到HDFS上。
l程序资源提交完毕后,申请运行mrAppMaster。
lRM将用户的请求初始化成一个Task。
l其中一个NodeManager领取到Task任务。
l该NodeManager创建容器Container,并产生MRAppmaster。
lContainer从HDFS上拷贝资源到本地。
lMRAppmaster向RM 申请运行MapTask资源。
lRM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
lMR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
lMrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
lReduceTask向MapTask获取相应分区的数据。
l程序运行完毕后,MR会向RM申请注销自己。
3.2.作业提交全过程详解
3.2.1.作业提交
l作业提第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。
l第2步:Client向RM申请一个作业id。
l第3步:RM给Client返回该job资源的提交路径和作业id。
l第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。
l第5步:Client提交完资源后,向RM申请运行MrAppMaster。
3.2.2.作业初始化
l第6步:当RM收到Client的请求后,将该job添加到容量调度器中。
l第7步:某一个空闲的NM领取到该Job。
l第8步:该NM创建Container,并产生MRAppmaster。
l第9步:下载Client提交的资源到本地。
3.2.3.任务分配
l第10步:MrAppMaster向RM申请运行多个MapTask任务资源。
l第11步:RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
3.2.4.任务运行
l第12步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
l第13步:MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
l第14步:ReduceTask向MapTask获取相应分区的数据。
l第15步:程序运行完毕后,MR会向RM申请注销自己。
3.2.5.进度和状态更新
lYARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。
3.2.6.作业完成
l除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。
3.3.资源调度器
3.3.1.目前,Hadoop作业调度器主要有三种:
lFIFO、Capacity Scheduler和Fair Scheduler。
lHadoop2.7.2默认的资源调度器是Capacity Scheduler。
3.3.2.先进先出调度器(FIFO)
3.3.3.容量调度器(Capacity Scheduler)
3.3.4.公平调度器(Fair Scheduler)
3.4.在搭建集群的过程中我们需要开启哪些进程服务才能确保集群能够正常的读取数据、以及在Yarn上完成计算任务?
lnamenode datanode secondarynamenode
lresourcemanager nodemanager
以上是关于Hadoop相关的主要内容,如果未能解决你的问题,请参考以下文章