Hadoop从入门到精通33:MapReduce核心原理之Shuffle过程分析
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop从入门到精通33:MapReduce核心原理之Shuffle过程分析相关的知识,希望对你有一定的参考价值。
参考技术A在安装Hadoop集群的时候,我们在yarn-site.xml文件中配置了MapReduce的运行方式为yarn.nodemanager.aux-services=mapreduce_shuffle。本节就来详细介绍一下MapReduce的shuffle过程。
shuffle,即混洗、洗牌的意思,是指MapReduce程序在执行过程中,数据在各个Mapper(Combiner、Sorter、Partitioner)、Reducer等进程之间互相交换的过程。
关于上图Shuffle过程的几点说明:
说明:map节点执行map task任务生成map的输出结果。
shuffle的工作内容:
从运算效率的出发点,map输出结果优先存储在map节点的内存中。每个map task都有一个内存缓冲区,存储着map的输出结果,当达到内存缓冲区的阀值(80%)时,需要将缓冲区中的数据以一个临时文件的方式存到磁盘,当整个map task结束后再对磁盘中这个map task所产生的所有临时文件做合并,生成最终的输出文件。最后,等待reduce task来拉取数据。当然,如果map task的结果不大,能够完全存储到内存缓冲区,且未达到内存缓冲区的阀值,那么就不会有写临时文件到磁盘的操作,也不会有后面的合并。
详细过程如下:
(1)map task任务执行,输入数据的来源是:HDFS的block。当然在mapreduce概念中,map task读取的是split分片。split与block的对应关系:一对一(默认)。
此处有必要说明一下block与split:
block(物理划分):文件上传到HDFS,就要划分数据成块,这里的划分属于物理的划分,块的大小可配置(默认:第一代为64M,第二代为128M)可通过 dfs.block.size配置。为保证数据的安 全,block采用冗余机制:默认为3份,可通过dfs.replication配置。注意:当更改块大小的配置后,新上传的文件的块大小为新配置的值,以前上传的文件的块大小为以前的配置值。
split(逻辑划分):Hadoop中split划分属于逻辑上的划分,目的只是为了让map task更好地获取数据。split是通过hadoop中的InputFormat接口中的getSplit()方法得到的。那么,split的大小具体怎么得到呢?
首先介绍几个数据量:
totalSize:整个mapreduce job所有输入的总大小。注意:基本单位是block个数,而不是Bytes个数。
numSplits:来自job.getNumMapTasks(),即在job启动时用户利用 org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)设置的值,从方法的名称上看,是用于设置map的个数。但是,最终map的个数也就是split的个数并不一定取用户设置的这个值,用户设置的map个数值只是给最终的map个数一个提示,只是一个影响因素,而不是决定因素。
goalSize:totalSize/numSplits,即期望的split的大小,也就是每个mapper处理多少的数据。但是仅仅是期望
minSize:split的最小值,该值可由两个途径设置:
最终取goalSize和minSize中的最大值!
最终:split大小的计算原则:finalSplitSize=max(minSize,min(goalSize,blockSize))
那么,map的个数=totalSize/finalSplitSize
注意: 新版的API中InputSplit划分算法不再考虑用户设定的Map Task个数,而是用mapred.max.split.size(记为maxSize)代替
即:InputSplit大小的计算公式为:splitSize=maxminSize,minmaxSize,blockSize
接下来就简答说说怎么根据业务需求,调整map的个数。
当我们用hadoop处理大批量的大数据时,一种最常见的情况就是job启动的mapper数量太多而超出系统限制,导致hadoop抛出异常终止执行。
解决方案:减少mapper的数量!具体如下:
a.输入文件数量巨大,但不是小文件
这种情况可通过增大每个mapper的inputsize,即增大minSize或者增大blockSize来减少所需的mapper的数量。增大blocksize通常不可行,因为HDFS被hadoop namenode -format之后,blocksize就已经确定了(由格式化时dfs.block.size决定),如果要更改blocksize,需要重新格式化HDFS,这样当然会丢失已有的数据。所以通常情况下只能增大minSize,即增大mapred.min.split.size的值。
b.输入文件数量巨大,且都是小文件
所谓小文件,就是单个文件的size小于blockSize。这种情况通过增大mapred.min.split.size不可行,需要使用FileInputFormat衍生的CombineFileInputFormat将多个input path合并成一个InputSplit送给mapper处理,从而减少mapper的数量。增加mapper的数量,可以通过减少每个mapper的输入做到,即减小blockSize或者减少mapred.min.split.size的值。
(2)map执行后,得到key/value键值对。接下来的问题就是,这些键值对应该交给哪个reduce做?注意:reduce的个数是允许用户在提交job时,通过设置方法设置的!
MapReduce提供partitioner接口解决上述问题。默认操作是:对key hash后再以reduce task数量取模,返回值决定着该键值对应该由哪个reduce处理。这种默认的取模方式只是为了平均reduce的处理能力,防止数据倾斜,保证负载均衡。如果用户自己对Partition有需求,可以自行定制并设置到job上。
接下来,需要将key/value以及Partition结果都写入到缓冲区,缓冲区的作用:批量收集map结果,减少磁盘IO的影响。当然,写入之前,这些数据都会被序列化成字节数组。而整个内存缓冲区就是一个字节数组。这个内存缓冲区是有大小限制的,默认100MB。当map task的输出结果很多时,就可能撑爆内存。需将缓冲区的数据临时写入磁盘,然后重新利用这块缓冲区。
从内存往磁盘写数据被称为Spill(溢写),由单独线程完成,不影响往缓冲区写map结果的线程。溢写比例:spill.percent(默认0.8)。
当缓冲区的数据达到阀值,溢写线程启动,锁定这80MB的内存,执行溢写过程。剩下的20MB继续写入map task的输出结果。互不干涉!
当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是mapreduce模型的默认行为,也是对序列化的字节做的排序。排序规则:字典排序!
map task的输出结果写入内存后,当溢写线程未启动时,对输出结果并没有做任何的合并。从官方图可以看出,合并是体现在溢写的临时磁盘文件上的,且这种合并是对不同的reduce端的数值做的合并。所以溢写过程一个很重要的细节在于,如果有很多个key/value对需要发送到某个reduce端,那么需要将这些键值对拼接到一块,减少与partition相关的索引记录。如果client设置Combiner,其会将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。注意:这里的合并并不能保证map结果中所有的相同的key值的键值对的value都合并了,它合并的范围只是这80MB,它能保证的是在每个单独的溢写文件中所有键值对的key值均不相同!
溢写生成的临时文件的个数随着map输出结果的数据量变大而增多,当整个map task完成,内存中的数据也全部溢写到磁盘的一个溢写文件。也就是说,不论任何情况下,溢写过程生成的溢写文件至少有一个!但是最终的文件只能有一个,需要将这些溢写文件归并到一起,称为merge。merge是将所有的溢写文件归并到一个文件,结合上面所描述的combiner的作用范围,归并得到的文件内键值对有可能拥有相同的key,这个过程如果client设置过Combiner,也会合并相同的key值的键值对,如果没有,merge得到的就是键值集合,如“aaa”, [5, 8, 2, …]。注意:combiner的合理设置可以提高效率,但是如果使用不当会影响效率!
至此,map端的所有工作都已经结束!
当mapreduce任务提交后,reduce task就不断通过RPC从JobTracker那里获取map task是否完成的信息,如果获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程就开始启动。其实呢,reduce task在执行之前的工作就是:不断地拉取当前job里每个map task的最终结果,并对不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce task的输入文件。
1.Copy过程,简单地拉取数据。Reduce进程启动一些数据copy线程(Fether),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘。
2.Merge过程。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy过来的数值。Copy过来的数据会先放入内存缓冲区中,这里缓冲区的大小要比map端的更为灵活,它是基于JVM的heap size设置,因为shuffler阶段reducer不运行,所以应该把绝大部分的内存都给shuffle用。
merge的三种形式:内存到内存、内存到磁盘、磁盘到磁盘。默认情况下,第一种形式不启用。当内存中的数据量达到一定的阀值,就启动内存到磁盘的merge。与map端类似,这也是溢写过程,当然如果这里设置了Combiner,也是会启动的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。
3.reducer的输入文件。不断地merge后,最后会生成一个“最终文件”。这个最终文件可能在磁盘中也可能在内存中。当然我们希望它在内存中,直接作为reducer的输入,但默认情况下,这个文件是存放于磁盘中的。当reducer的输入文件已定,整个shuffle才最终结束。然后就是reducer执行,把结果存放到HDFS上。
MapReuce中对大数据处理最合适的数据格式是什么?
本节作为《Hadoop从入门到精通》大型专题的第三章第二节将教大家如何在Mapreduce中使用XML和JSON两大常见格式,并分析比较最适合Mapreduce大数据处理的数据格式。
在本章的第一章节介绍中,我们简单了解了Mapreduce数据序列化的概念,以及其对于XML和JSON格式并不友好。本节作为《Hadoop从入门到精通》大型专题的第三章第二节将教大家如何在Mapreduce中使用XML和JSON两大常见格式,并分析比较最适合Mapreduce大数据处理的数据格式。
MapReuce中对大数据处理最合适的数据格式是什么?
3.2.1 XML
XML自1998年诞生以来就作为一种数据格式来表示机器和人类都可读的数据。它成为系统之间数据交换的通用语言,现在被许多标准所采用,例如SOAP和RSS,并且被用作Microsoft Office等产品的开放数据格式。
MapReduce和XML
MapReduce捆绑了与文本一起使用的InputFormat,但没有支持XML,也就是说,原生Mapreduce对XML十分不友好。在MapReduce中并行处理单个XML文件很棘手,因为XML不包含其数据格式的同步标记。
问题
希望在MapReduce中使用大型XML文件,并能够并行拆分和处理。
解决方案
Mahout的XMLInputFormat可用于MapReduce处理HDFS中的XML文件。 它读取由特定XML开始和结束标记分隔的记录,此技术还解释了如何在MapReduce中将XML作为输出发送。
MapReduce不包含对XML的内置支持,因此我们转向另一个Apache项目——Mahout,一个提供XML InputFormat的机器学习系统。 要了解XML InputFormat,你可以编写一个MapReduce作业,该作业使用Mahout的XML输入格式从Hadoop的配置文件(HDFS)中读取属性名称和值。
第一步是对作业进行配置:
Mahout的XML输入格式很简陋,我们需要指定文件搜索的确切开始和结束XML标记,并使用以下方法拆分文件(并提取记录):
文件沿着HDFS块边界分成不连续的部分,用于数据本地化。
每个map任务都在特定的输入拆分上运行,map任务寻求输入拆分的开始,然后继续处理文件,直到第一个xmlinput.start。
重复发出xmlinput.start和xmlinput.end之间的内容,直到输入拆分的末尾。
接下来,你需要编写一个映射器来使用Mahout的XML输入格式。Text表单已提供XML元素,因此需要使用XML解析器从XML中提取内容。
表3.1 使用Java的STAX解析器提取内容
该map具有一个Text实例,该实例包含start和end标记之间数据的String表示。在此代码中,我们可以使用Java的内置Streaming API for XML(StAX)解析器提取每个属性的键和值并输出。
如果针对Cloudera的core-site.xml运行MapReduce作业并使用HDFS cat命令显示输出,将看到以下内容:
此输出显示已成功使用XML作为MapReduce的输入序列化格式。不仅如此,还可以支持巨大的XML文件,因为输入格式支持拆分XML。
写XML
当可以正常读XML之后,我们要解决的就是如何写XML。 在reducer中,调用main reduce方法之前和之后都会发生回调,可以使用它来发出开始和结束标记,如下所示。
表3.2 用于发出开始和结束标记的reducer
这也可以嵌入到OutputFormat中。
Pig
如果想在Pig中使用XML,Piggy Bank library(用户贡献的Pig代码库)包含一个XMLLoader。其工作方式与此技术非常相似,可捕获开始和结束标记之间的所有内容,并将其作为Pig元组中的单字节数组字段提供。
Hive
目前没有办法在Hive中使用XML,必须写一个自定义SerDe。
总结
Mahout的XmlInputFormat可帮助使用XML,但它对开始和结束元素名称的精确字符串匹配很敏感。如果元素标记包含具有变量值的属性,无法控制元素生成或者可能导致使用XML命名空间限定符,则此方法不可用。
如果可以控制输入中的XML,则可以通过在每行使用单个XML元素来简化此练习。这允许使用内置的MapReduce基于文本的输入格式(例如TextInputFormat),它将每一行视为记录并拆分。
值得考虑的另一个选择是预处理步骤,可以将原始XML转换为每个XML元素的单独行,或者将其转换为完全不同的数据格式,例如SequenceFile或Avro,这两种格式都解决了拆分问题。
现在,你已经了解如何使用XML,让我们来处理另一种流行的序列化格式JSON。
3.2.2 JSON
JSON共享XML的机器和人类可读特征,并且自21世纪初以来就存在。它比XML简洁,但是没有XML中丰富的类型和验证功能。
如果有一些代码正在从流式REST服务中下载JSON数据,并且每小时都会将文件写入HDFS。由于下载的数据量很大,因此生成的每个文件大小为数千兆字节。
如果你被要求编写一个MapReduce作业,需要将大型JSON文件作为输入。你可以将问题分为两部分:首先,MapReduce没有与JSON一起使用的InputFormat; 其次,如何分割JSON?
图3.7显示了拆分JSON问题。 想象一下,MapReduce创建了一个拆分,如图所示。对此输入拆分进行操作的map任务将执行对输入拆分的搜索,以确定下一条记录的开始。对于诸如JSON和XML之类的文件格式,由于缺少同步标记或任何其他标识记录开头,因此知道下一条记录何时开始是很有挑战性的。
JSON比XML等格式更难分割成不同的段,因为JSON没有token(如XML中的结束标记)来表示记录的开头或结尾。
问题
希望在MapReduce中使用JSON输入,并确保可以为并发读取分区输入JSON文件。
解决方案
Elephant Bird LzoJsonInputFormat被用来作为创建输入格式类以使用JSON元素的基础,该方法可以使用多行JSON。
图3.7 使用JSON和多个输入拆分的问题示例
讨论
Elephant Bird(https://github.com/kevinweil/elephant-bird)是一个开源项目,包含用于处理LZOP压缩的有用程序,它有一个可读取JSON的LzoJsonInputFormat,尽管要求输入文件是LZOP-compressed。,但可以将Elephant Bird代码用作自己的JSON InputFormat模板,该模板不具有LZOP compression要求。
此解决方案假定每个JSON记录位于单独的行上。JsonRecordFormat很简单,除了构造和返回JsonRecordFormat之外什么也没做,所以我们将跳过该代码。JsonRecordFormat向映射器发出LongWritable,MapWritable key/value,其中MapWritable是JSON元素名称及其值的映射。
我们来看看RecordReader的工作原理,它使用LineRecordReader,这是一个内置的MapReduce读取器。要将该行转换为MapWritable,读取器使用json-simple解析器将该行解析为JSON对象,然后迭代JSON对象中的键并将它们与其关联值一起放到MapWritable。映射器在LongWritable中被赋予JSON数据,MapWritable pairs可以相应地处理数据。
以下显示了JSON对象示例:
该技巧假设每行一个JSON对象,以下代码显示了在此示例中使用的JSON文件:
现在将JSON文件复制到HDFS并运行MapReduce代码。MapReduce代码写入每个JSON key/value对并输出:
写JSON
类似于3.2.1节,编写XML的方法也可用于编写JSON。
Pig
Elephant Bird包含一个JsonLoader和LzoJsonLoader,可以使用它来处理Pig中的JSON,这些加载器使用基于行的JSON。每个Pig元组都包含该行中每个JSON元素的chararray字段。
Hive
Hive包含一个可以序列化JSON的DelimitedJSONSerDe类,但遗憾的是无法对其进行反序列化,因此无法使用此SerDe将数据加载到Hive中。
总结
此解决方案假定JSON输入的结构为每个JSON对象一行。那么,如何使用跨多行的JSON对象?GitHub上有一个项目( https://github.com/alexholmes/json-mapreduce)可以在单个JSON文件上进行多个输入拆分,此方法可搜索特定的JSON成员并检索包含的对象。
你可以查看名为hive-json-serde的Google项目,该项目可以同时支持序列化和反序列化。
正如你所看到的,在MapReduce中使用XML和JSON是非常糟糕的,并且对如何布局数据有严格要求。MapReduce对这两种格式的支持也很复杂且容易出错,因为它们不适合拆分。显然,需要查看具有内部支持且可拆分的替代文件格式。
下一步是研究更适合MapReduce的复杂文件格式,例如Avro和SequenceFile。
3.3 大数据序列化格式
当使用scalar或tabular数据时,非结构化文本格式很有效。诸如XML和JSON之类的半结构化文本格式可以对包括复合字段或分层数据的复杂数据结构进行建模。但是,当处理较大数据量时,我们更需要具有紧凑序列化表单的序列化格式,这些格式本身支持分区并具有模式演变功能。
在本节中,我们将比较最适合MapReduce大数据处理的序列化格式,并跟进如何将它们与MapReduce一起使用。
3.3.1 比较SequenceFile,Protocol Buffers,Thrift和Avro
根据经验,在选择数据序列化格式时,以下特征非常重要:
代码生成——某些序列化格式具有代码生成作用的库,允许生成丰富的对象,使更容易与数据交互。生成的代码还提供了类似安全性等额外好处,以确保消费者和生产者使用正确的数据类型。
架构演变 - 数据模型随着时间的推移而发展,重要的是数据格式支持修改数据模型的需求。模式演变功能允许你添加、修改并在某些情况下删除属性,同时为读和写提供向后和向前兼容性。
语言支持 - 可能需要使用多种编程语言访问数据,主流语言支持数据格式非常重要。
数据压缩 - 数据压缩非常重要,因为可以使用大量数据。并且,理想的数据格式能够在写入和读取时内部压缩和解压缩数据。如果数据格式不支持压缩,那么对于程序员而言,这是一个很大的问题,因为这意味着必须将压缩和解压缩作为数据管道的一部分进行管理(就像使用基于文本的文件格式一样)。
可拆分性 - 较新的数据格式支持多个并行读取器,可读取和处理大型文件的不同块。文件格式包含同步标记至关重要(可随机搜索和扫描到下一条记录开头)。
支持MapReduce和Hadoop生态系统 - 选择的数据格式必须支持MapReduce和其他Hadoop生态系统关键项目,例如Hive。如果没有这种支持,你将负责编写代码以使文件格式适用于这些系统。
表3.1比较了流行的数据序列化框架,以了解它们如何相互叠加。以下讨论提供了有关这些技术的其他背景知识。
表3.1数据序列化框架的功能比较
让我们更详细地看一下这些格式。
SequenceFile
创建SequenceFile格式是为了与MapReduce、Pig和Hive一起使用,因此可以很好地与所有工具集成。缺点主要是缺乏代码生成和版本控制支持,以及有限的语言支持。
Protocol Buffers
Protocol Buffers 已被Google大量用于互操作,其优势在于其版本支持二进制格式。缺点是MapReduce(或任何第三方软件)缺乏对读取Protocol Buffers 序列化生成的文件支持。但是,Elephant Bird可以在容器文件中使用Protocol Buffers序列化。
Thrift
Thrift是Facebook内部开发的数据序列化和RPC框架,在本地数据序列化格式中不支持MapReduce,但可以支持不同的wire-level数据表示,包括JSON和各种二进制编码。 Thrift还包括具有各种类型服务器的RPC层。本章将忽略RPC功能,并专注于数据序列化。
Avro
Avro格式是Doug Cutting创建的,旨在帮助弥补SequenceFile的不足。
Parquet
Parquet是一种具有丰富Hadoop系统支持的柱状文件格式,可以与Avro、Protocol Buffers和Thrift等友好工作。尽管 Parquet 是一个面向列的文件格式,不要期望每列一个数据文件。Parquet 在同一个数据文件中保存一行中的所有数据,以确保在同一个节点上处理时一行的所有列都可用。Parquet 所做的是设置 HDFS 块大小和最大数据文件大小为 1GB,以确保 I/O 和网络传输请求适用于大批量数据。
基于上述评估标准,Avro似乎最适合作为Hadoop中的数据序列化框架。SequenceFile紧随其后,因为它与Hadoop具有内在兼容性(它设计用于Hadoop)。
你可以在Github上查看jvm-serializers项目,该项目运行各种基准测试,以根据序列化和反序列化时间等比较文件格式。它包含Avro,Protocol Buffers和Thrift基准测试以及许多其他框架。
在了解了各种数据序列化框架后,我们将在接下来几节中专门讨论这些格式。
以上是关于Hadoop从入门到精通33:MapReduce核心原理之Shuffle过程分析的主要内容,如果未能解决你的问题,请参考以下文章
大数据仓库Hive实战视频教程-HIVE完美入门学习视频教程 HIVE教程 HIVE从入门到精通