一文即懂MapReduce

Posted 写轮眼之大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一文即懂MapReduce相关的知识,希望对你有一定的参考价值。

我发现,

老鼠偷东西,

只是人类的说法,

在老鼠那里,

这叫觅食。

---《我发现》姜二嫚 八岁

一、MapReduce是什么

Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上,并以一种可靠的,具有容错能力的方式并行地处理上TB级别的海量数据集。这个定义里面有着这些关键词,

一是软件框架,二是并行处理,三是可靠且容错,四是大规模集群,五是海量数据集。

二、MapReduce做什么

  MapReduce擅长处理大数据,它为什么具有这种能力呢?这可由MapReduce的设计思想发觉。MapReduce的思想就是“分而治之”。

  (1)Mapper负责“,即把复杂的任务分解为若干个“简单的任务”来处理。“简单的任务”包含三层含义:

一是数据或计算的规模相对原任务要大大缩小;二是就近计算原则,即任务会分配到存放着所需数据的节点上进行计算;三是这些小任务可以并行计算,彼此间几乎没有依赖关系。

  (2)Reducer负责对map阶段的结果进行汇总。至于需要多少个Reducer,用户可以根据具体问题,通过在mapred-site.xml配置文件里设置参数mapred.reduce.tasks的值,缺省值为1。

一个比较形象的语言解释MapReduce:  

我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。

现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。

三、MapReduce工作机制

一文即懂MapReduce

1. MapReduce的整个工作过程如上图所示,它包含如下4个独立的实体:

  实体一:客户端,用来提交MapReduce作业。

  实体二:JobTracker,用来协调作业的运行。

  实体三:TaskTracker,用来处理作业划分后的任务。

  实体四:HDFS,用来在其它实体间共享作业文件。

一文即懂MapReduce

2.详细工作流程

一文即懂MapReduce


一文即懂MapReduce

流程详解

上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从7步开始16结束,具体Shuffle过程详解如下:

1MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中

2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

3)多个溢出文件会被合并成大的溢出文件

4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序

5ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据

6ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)

7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)

3. mapreduce程序在分布式运行时的实例进程

1、MRAppMaster:负责整个程序的过程调度及状态协调

2、Yarnchild:负责 map 阶段的整个数据处理流程

3、Yarnchild:负责 reduce 阶段的整个数据处理流程 以上两个阶段 MapTask 和 ReduceTask 的进程都是 YarnChild,并不是说这 MapTask 和 ReduceTask 就跑在同一个 YarnChild 进行里

四、MapTask并行度

4.1 maptask的并行度

Hadoop中MapTask的并行度的决定机制。在MapReduce程序的运行中,并不是MapTask越多就越好。需要考虑数据量的多少及机器的配置。如果数据量很少,可能任务启动的时间都远远超过数据的处理时间。同样可不是越少越好。

那么应该如何切分呢?

假如我们有一个300M的文件,它会在HDFS中被切成3块。0-128M,128-256M,256-300M。并被放置到不同的节点上去了。在MapReduce任务中,这3个Block会被分给3个MapTask。

MapTask在任务切片时实际上也是分配一个范围,只是这个范围是逻辑上的概念,与block的物理划分没有什么关系。但在实践过程中如果MapTask读取的数据不在运行的本机,则必须通过网络进行数据传输,对性能的影响非常大。所以常常采取的策略是就按照块的存储切分MapTask,使得每个MapTask尽可能读取本机的数据。

如果一个Block非常小,也可以把多个小Block交给一个MapTask。

所以MapTask的切分要看情况处理。默认的实现是按照Block大小进行切分。MapTask的切分工作由客户端(我们写的main方法)负责。一个切片就对应一个MapTask实例。

4.2 maptask并行度决定机制

1个job的map阶段并行度由客户端在提交job时决定。

而客户端对map阶段并行度的规划的基本逻辑为:

将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个mapTask并行实例处理

切片机制:

FileInputFormat 中默认的切片机制:

1、简单地按照文件的内容长度进行切片

2、切片大小,默认等于 block 大小

3、切片时不考虑数据集整体,而是逐个针对每一个文件单独切片 比如待处理数据有两个文件:

  File1.txt 200M

  File2.txt 100M

经过 getSplits()方法处理之后,形成的切片信息是:

File1.txt-split1 0-128M

File1.txt-split2 129M-200M

File2.txt-split1 0-100M

FileInputFormat 中切片的大小的参数配置

 通过分析源码,在 FileInputFormat 中,计算切片大小的逻辑: 

long splitSize = computeSplitSize(blockSize, minSize, maxSize),

翻译一下就是求这三个值的中 间值

切片主要由这几个值来运算决定:

blocksize:默认是 128M,可通过 dfs.blocksize 修改

minSize:默认是 1,可通过 mapreduce.input.fileinputformat.split.minsize 修改

maxsize:默认是 Long.MaxValue,可通过 mapreduce.input.fileinputformat.split.maxsize 修改

因此,如果 maxsize 调的比 blocksize 小,则切片会小于 blocksize 如果 minsize 调的比 blocksize 大,则切片会大于 blocksize 但是,不论怎么调参数,都不能让多个小文件“划入”一个 split


五、ReduceTask并行度

5.1 reducetask并行度

reducetask 的并行度同样影响整个 job 的执行并发度和执行效率,但与 maptask 的并发数由 切片数决定不同,Reducetask 数量的决定是可以直接手动设置:job.setNumReduceTasks(4);

  • 默认值是 1,

  • 手动设置为 4,表示运行 4 个 reduceTask,

  • 设置为 0,表示不运行 reduceTask 任务,也就是没有 reducer 阶段,只有 mapper 阶段

如果数据分布不均匀,就有可能在 reduce 阶段产生数据倾斜

注意:reducetask 数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全 局汇总结果,就只能有 1 个 reducetask

尽量不要运行太多的 reducetask。对大多数 job 来说,最好 rduce 的个数最多和集群中的 reduce 持平,或者比集群的 reduce slots 小。这个对于小集群而言,尤其重要。

5.2 reducetask并行度决定机制

  1. job.setNumReduceTasks(number);

  2. job.setReducerClass(MyReducer.class);

  3. job.setPartitioonerClass(MyPTN.class);

分以下几种情况讨论:

1、如果number为1,并且2已经设置为自定义Reducer, reduceTask的个数就是1
不管用户编写的MR程序有没有设置Partitioner,那么该分区组件都不会起作用

2、如果number没有设置,并且2已经设置为自定义Reducer, reduceTask的个数就是1
在默认的分区组件的影响下,不管用户设置的number,不管是几,只要大于1,都是可以正常执行的。
如果在设置自定义的分区组件时,那么就需要注意:
你设置的reduceTasks的个数,必须要 ==== 分区编号中的最大值 + 1
最好的情况下:分区编号都是连续的。
那么reduceTasks = 分区编号的总个数 = 分区编号中的最大值 + 1

3、如果number为 >= 2 并且2已经设置为自定义Reducer reduceTask的个数就是number
底层会有默认的数据分区组件在起作用

4、如果你设置了number的个数,但是没有设置自定义的reducer,那么该mapreduce程序不代表没有reducer阶段
真正的reducer中的逻辑,就是调用父类Reducer中的默认实现逻辑:原样输出
reduceTask的个数 就是 number
5、如果一个MR程序中,不想有reducer阶段。那么只需要做一下操作即可:
job.setNumberReudceTasks(0);
整个MR程序只有mapper阶段。没有reducer阶段。
那么就没有shuffle阶段

5.3 partitioner的作用

在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,比如按照省份划分的话,需要把同一省份的数据放到一个文件中;按照性别划分的话,需要把同一性别的数据放到一个文件中。我们知道最终的输出数据是来自于Reducer任务。那么,如果要得到多个文件,意味着有同样数量的Reducer任务在运行。Reducer任务的数据来自于Mapper任务,也就说Mapper任务要划分数据,对于不同的数据分配给不同的Reducer任务运行。Mapper任务划分数据的过程就称作Partition。负责实现划分数据的类称作Partitioner

5.4 combinar的作用

combiner其实属于优化方案,由于带宽限制,应该尽量map和reduce之间的数据传输数量。它在Map端把同一个key的键值对合并在一起并计算,计算规则与reduce一致,所以combiner也可以看作特殊的Reducer。

执行combiner操作要求开发者必须在程序中设置了combiner(程序中通过job.setCombinerClass(myCombine.class)自定义combiner操作)。

Combiner组件是用来做局部汇总的,就在mapTask中进行汇总;Reducer组件是用来做全局汇总的,最终的,最后一次汇总。

六、MapReduce的shuffle详解

1、MapReduce 中,mapper 阶段处理的数据如何传递给 reducer 阶段,是 MapReduce 框架中 最关键的一个流程,这个流程就叫 Shuffle

2、Shuffle: 数据混洗 ——(核心机制:数据分区,排序,局部聚合,缓存,拉取,再合并 排序)

3、具体来说:就是将 MapTask 输出的处理结果数据,按照 Partitioner 组件制定的规则分发 给 ReduceTask,并在分发的过程中,对数据按 key 进行了分区和排序

6.1 MapReduce的Shuffle过程介绍:

Shuffle的本义是洗牌、混洗,把一组有一定规则的数据尽量转换成一组无规则的数据,越随机越好。MapReduce中的Shuffle更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据。

为什么MapReduce计算模型需要Shuffle过程?我们都知道MapReduce计算模型一般包括两个重要的阶段:Map是映射,负责数据的过滤分发;Reduce是规约,负责数据的计算归并。Reduce的数据来源于Map,Map的输出即是Reduce的输入,Reduce需要通过Shuffle来获取数据。

从Map输出到Reduce输入的整个过程可以广义地称为Shuffle。Shuffle横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和sort过程,如图所示:

一文即懂MapReduce

Spill过程:

Spill过程包括输出、排序、溢写、合并等步骤,如图所示:

一文即懂MapReduce

Collect:

每个Map任务不断地以对的形式把数据输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。

这个数据结构其实就是个字节数组,叫Kvbuffer,名如其义,但是这里面不光放置了数据,还放置了一些索引数据,给放置索引数据的区域起了一个Kvmeta的别名,在Kvbuffer的一块区域上穿了一个IntBuffer(字节序采用的是平台自身的字节序)的马甲。数据区域和索引数据区域在Kvbuffer中是相邻不重叠的两个区域,用一个分界点来划分两者,分界点不是亘古不变的,而是每次Spill之后都会更新一次。初始的分界点是0,数据的存储方向是向上增长,索引数据的存储方向是向下增长,如图所示:

一文即懂MapReduce

Kvbuffer的存放指针bufindex是一直闷着头地向上增长,比如bufindex初始值为0,一个Int型的key写完之后,bufindex增长为4,一个Int型的value写完之后,bufindex增长为8。

索引是对在kvbuffer中的索引,是个四元组,包括:value的起始位置、key的起始位置、partition值、value的长度,占用四个Int长度,Kvmeta的存放指针Kvindex每次都是向下跳四个“格子”,然后再向上一个格子一个格子地填充四元组的数据。比如Kvindex初始位置是-4,当第一个写完之后,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的长度,然后Kvindex跳到-8位置,等第二个和索引写完之后,Kvindex跳到-32位置。

Kvbuffer的大小虽然可以通过参数设置,但是总共就那么大,和索引不断地增加,加着加着,Kvbuffer总有不够用的那天,那怎么办?把数据从内存刷到磁盘上再接着往内存写数据,把Kvbuffer中的数据刷到磁盘上的过程就叫Spill,多么明了的叫法,内存中的数据满了就自动地spill到具有更大空间的磁盘。

关于Spill触发的条件,也就是Kvbuffer用到什么程度开始Spill,还是要讲究一下的。如果把Kvbuffer用得死死得,一点缝都不剩的时候再开始Spill,那Map任务就需要等Spill完成腾出空间之后才能继续写数据;如果Kvbuffer只是满到一定程度,比如80%的时候就开始Spill,那在Spill的同时,Map任务还能继续写数据,如果Spill够快,Map可能都不需要为空闲空间而发愁。两利相衡取其大,一般选择后者。

Spill这个重要的过程是由Spill线程承担,Spill线程从Map任务接到“命令”之后就开始正式干活,干的活叫SortAndSpill,原来不仅仅是Spill,在Spill之前还有个颇具争议性的Sort。

Sort:

先把Kvbuffer中的数据按照partition值和key两个关键字升序排序,移动的只是索引数据,排序结果是Kvmeta中数据按照partition为单位聚集在一起,同一partition内的按照key有序。

Spill:

Spill线程为这次Spill过程创建一个磁盘文件:从所有的本地目录中轮训查找能存储这么大空间的目录,找到之后在其中创建一个类似于“spill12.out”的文件。Spill线程根据排过序的Kvmeta挨个partition的把数据吐到这个文件中,一个partition对应的数据吐完之后顺序地吐下个partition,直到把所有的partition遍历完。一个partition在文件中对应的数据也叫段(segment)。

所有的partition对应的数据都放在这个文件里,虽然是顺序存放的,但是怎么直接知道某个partition在这个文件中存放的起始位置呢?强大的索引又出场了。有一个三元组记录某个partition对应的数据在这个文件中的索引:起始位置、原始数据长度、压缩之后的数据长度,一个partition对应一个三元组。然后把这些索引信息存放在内存中,如果内存中放不下了,后续的索引信息就需要写到磁盘文件中了:从所有的本地目录中轮训查找能存储这么大空间的目录,找到之后在其中创建一个类似于“spill12.out.index”的文件,文件中不光存储了索引数据,还存储了crc32的校验数据。(spill12.out.index不一定在磁盘上创建,如果内存(默认1M空间)中能放得下就放在内存中,即使在磁盘上创建了,和spill12.out文件也不一定在同一个目录下。)

每一次Spill过程就会最少生成一个out文件,有时还会生成index文件,Spill的次数也烙印在文件名中。索引文件和数据文件的对应关系如下图所示:

一文即懂MapReduce

在Spill线程如火如荼的进行SortAndSpill工作的同时,Map任务不会因此而停歇,而是一无既往地进行着数据输出。Map还是把数据写到kvbuffer中,那问题就来了:只顾着闷头按照bufindex指针向上增长,kvmeta只顾着按照Kvindex向下增长,是保持指针起始位置不变继续跑呢,还是另谋它路?如果保持指针起始位置不变,很快bufindex和Kvindex就碰头了,碰头之后再重新开始或者移动内存都比较麻烦,不可取。Map取kvbuffer中剩余空间的中间位置,用这个位置设置为新的分界点,bufindex指针移动到这个分界点,Kvindex移动到这个分界点的-16位置,然后两者就可以和谐地按照自己既定的轨迹放置数据了,当Spill完成,空间腾出之后,不需要做任何改动继续前进。分界点的转换如下图所示:

一文即懂MapReduce

Map任务总要把输出的数据写到磁盘上,即使输出数据量很小在内存中全部能装得下,在最后也会把数据刷到磁盘上。

Merge:

Map任务如果输出数据量很大,可能会进行好几次Spill,out文件和Index文件会产生很多,分布在不同的磁盘上。最后把这些文件进行合并的merge过程闪亮登场。

Merge过程怎么知道产生的Spill文件都在哪了呢?从所有的本地目录上扫描得到产生的Spill文件,然后把路径存储在一个数组里。Merge过程又怎么知道Spill的索引信息呢?没错,也是从所有的本地目录上扫描得到Index文件,然后把索引信息存储在一个列表里。到这里,又遇到了一个值得纳闷的地方。在之前Spill过程中的时候为什么不直接把这些信息存储在内存中呢,何必又多了这步扫描的操作?特别是Spill的索引数据,之前当内存超限之后就把数据写到磁盘,现在又要从磁盘把这些数据读出来,还是需要装到更多的内存中。之所以多此一举,是因为这时kvbuffer这个内存大户已经不再使用可以回收,有内存空间来装这些数据了。(对于内存空间较大的土豪来说,用内存来省却这两个io步骤还是值得考虑的。)

然后为merge过程创建一个叫file.out的文件和一个叫file.out.Index的文件用来存储最终的输出和索引。

一个partition一个partition的进行合并输出。对于某个partition来说,从索引列表中查询这个partition对应的所有索引信息,每个对应一个段插入到段列表中。也就是这个partition对应一个段列表,记录所有的Spill文件中对应的这个partition那段数据的文件名、起始位置、长度等等。

然后对这个partition对应的所有的segment进行合并,目标是合并成一个segment。当这个partition对应很多个segment时,会分批地进行合并:先从segment列表中把第一批取出来,以key为关键字放置成最小堆,然后从最小堆中每次取出最小的输出到一个临时文件中,这样就把这一批段合并成一个临时的段,把它加回到segment列表中;再从segment列表中把第二批取出来合并输出到一个临时segment,把其加入到列表中;这样往复执行,直到剩下的段是一批,输出到最终的文件中。

最终的索引数据仍然输出到Index文件中。

Map端的Shuffle过程到此结束。

Copy:

Reduce任务通过HTTP向各个Map任务拖取它所需要的数据。每个节点都会启动一个常驻的HTTP server,其中一项服务就是响应Reduce拖取Map数据。当有MapOutput的HTTP请求过来的时候,HTTP server就读取相应的Map输出文件中对应这个Reduce部分的数据通过网络流输出给Reduce。

Reduce任务拖取某个Map对应的数据,如果在内存中能放得下这次数据的话就直接把数据写到内存中。Reduce要向每个Map去拖取数据,在内存中每个Map对应一块数据,当内存中存储的Map数据占用空间达到一定程度的时候,开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中。

如果在内存中不能放得下这个Map的数据的话,直接把Map数据写到磁盘上,在本地目录创建一个文件,从HTTP流中读取数据然后写到磁盘,使用的缓存区大小是64K。拖一个Map数据过来就会创建一个文件,当文件数量达到一定阈值时,开始启动磁盘文件merge,把这些文件合并输出到一个文件。

有些Map的数据较小是可以放在内存中的,有些Map的数据较大需要放在磁盘上,这样最后Reduce任务拖过来的数据有些放在内存中了有些放在磁盘上,最后会对这些来一个全局合并

Merge Sort:

这里使用的Merge和Map端使用的Merge过程一样。Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。

Reduce端的Shuffle过程至此结束。

七、MapReduce优化

1.小文件优化

  • 从源头干掉,也就是在hdfs上我们不存储小文件,也就是数据上传hdfs的时候我们就合并小文件

  • 在FileInputFormat读取入数据的时候我们使用实现类CombineFileInputFormat读取数据,在读取数据的时候进行合并。

2.数据倾斜问题优化

那么为什么会产生数据倾斜呢?

数据本身就不平衡,所以在默认的hashpartition时造成分区数据不一致问题,还有就是代码设计不合理等。

那如何解决数据倾斜的问题呢?

  1. 既然默认的是hash算法进行分区,那我们自定义分区,修改分区实现逻辑,结合业务特点,使得每个分区数据基本平衡;

  2. 既然有默认的分区算法,那么我们可以修改分区的键,让其符合hash分区,并且使得最后的分区平衡,比如在key前加随机数n-key;

  3. 既然reduce处理慢,我们可以增加reduce的内存和vcore呀,这样挺高性能就快了,虽然没从根本上解决问题,但是还有效果;

  4. 既然一个reduce处理慢,那我们可以增加reduce的个数来分摊一些压力呀,也不能根本解决问题,还是有一定的效果。

那么如果不是数据倾斜带来的问题,而是节点服务有问题造成某些map和reduce执行缓慢呢?

那么我们可以使用推测执行呀,你跑的慢,我们可以找个其他的节点重启一样的任务竞争,谁快谁为准。推测执行时以空间换时间的优化。会带来集群资源的浪费,会给集群增加压力,所以我司集群的推测执行都是关闭的。其实在作业执行的时候可以偷偷开启的呀

推测执行参数控制:

 
mapreduce.map.speculativemapreduce.reduce.speculative

3. mapreduce过程优化

MAP端:

那么map算是完整了,在reduce拉取数据之前,我们完全还可以combiner呀(不影响最终结果的情况下),此时会根据Combiner定义的函数对map的结果进行合并这样就可以减少数据的传输,降低磁盘io,提高性能了。

我司实际使用:mapreduce.map.memory.mb=3G(默认1G)#调整每个map的资源来提升map的处理能力mapreduce.map.cpu.vcores=1(默认也是1) #调整每个map的资源来提升map的处理能力我司调整mapreduce.task.io.sort.mb=512 #环形数据结构缓冲默认100Mmapreduce.map.sort.spill.percent #参数控制每达到80%都会重写溢写到一个新的文件 mapreduce.task.io.sort.factor #排序文件时,一次最多合并的文件数,默认10 #我司调整mapreduce.task.io.sort.factor=64 #文件溢写完后,会对这些文件进行合并,默认每次合并10         mapreduce.task.io.sort.mb #排序map输出所需要使用内存缓冲的大小,以兆为单位, 默认为100mapreduce.map.sort.spill.percent #map输出缓冲和用来磁盘溢写过程的记录边界索引,这两者使用的阈值,默认0.8mapreduce.map.output.compress    #在map溢写磁盘的过程是否使用压缩,默认false #(defaultfalse)设置为true进行压缩,数据会被压缩写入磁盘,读数据读的是压缩数据需要解压, #在实际经验中HiveHadoop的运行的瓶颈一般都是IO而不是CPUorg.apache.hadoop.io.compress.SnappyCodec #map溢写磁盘的压缩算法,默认org.apache.hadoop.io.compress.DefaultCodec #我司使用org.apache.hadoop.io.compress.SnappyCodec算法,但这个过程会消耗CPU,适合IO瓶颈比较大。mapreduce.shuffle.max.threads #该参数表示每个节点管理器的工作线程,用于map输出到reduce,默认为0,表示可用处理器的两倍

REDUCE端:

首先我们可以通过参数设置合理的reduce个数(mapreduce.job.reduces参数控制),以及通过参数设置每个reduce的资源,mapreduce.reduce.memory.mb=5G(默认1G)
mapreduce.reduce.cpu.vcores=1(默认为1)。

reduce在copy的过程中默认使用5(mapreduce.reduce.shuffle.parallelcopies参数控制)个并行度进行复制数据,我司调了mapreduce.reduce.shuffle.parallelcopies=100.reduce的每一个下载线程在下载某个map数据的时候,有可能因为那个map中间结果所在机器发生错误,或者中间结果的文件丢失,或者网络瞬断等等情况,这样reduce的下载就有可能失败,所以reduce的下载线程并不会无休止的等待下去,当一定时间后下载仍然失败,那么下载线程就会放弃这次下载,并在随后尝试从另外的地方下载(因为这段时间map可能重跑)。reduce下载线程的这个最大的下载时间段是可以通过mapreduce.reduce.shuffle.read.timeout(default180000秒)调整的。

Copy过来的数据会先放入内存缓冲区中,然后当使用内存达到一定量的时候才spill磁盘。这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置。这个内存大小的控制就不像map一样可以通过io.sort.mb来设定了,而是通过另外一个参数 mapreduce.reduce.shuffle.input.buffer.percent(default 0.7)控制的。意思是说,shuffile在reduce内存中的数据最多使用内存量为:0.7 × maxHeap of reduce task,内存到磁盘merge的启动门限可以通过mapreduce.reduce.shuffle.merge.percent(default0.66)配置。

copy完成后,reduce进入归并排序阶段,合并因子默认为10(mapreduce.task.io.sort.factor参数控制),如果map输出很多,则需要合并很多趟,所以可以提高此参数来减少合并次数。

mapreduce.reduce.shuffle.parallelcopies #把map输出复制到reduce的线程数,默认5; #我司调了mapreduce.reduce.shuffle.parallelcopies=100mapreduce.reduce.shuffle.read.timeoutdefault180000秒)#reduce下载线程的这个最大的下载时间段, #当一定时间后下载仍然失败,那么下载线程就会放弃这次下载,并在随后尝试从另外的地方下载(因为这段时间map可能重跑)。mapreduce.task.io.sort.factor #排序文件时一次最多合并文件的个数mapreduce.reduce.shuffle.input.buffer.percent #在shuffle的复制阶段,分配给map输出缓冲区占堆内存的百分比,默认0.7mapreduce.reduce.shuffle.merge.percent #map输出缓冲区的阈值,用于启动合并输出和磁盘溢写的过程