Hadoop[3] MapReduce理论详解

Posted 热爱生活的小熊猫

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop[3] MapReduce理论详解相关的知识,希望对你有一定的参考价值。

 Hadoop系列  03-MR理论详解之shuffle

前面的两篇文章简单解释了一下MapReduce干了什么,但是作为一个严谨的技术人员肯定要上升到理论层次,并且要结合Hadoop框架进行实际的操作才能真正理解MapReduce。从本文开始,我将基于自己浅薄的理解和文档系统的介绍MapReduce,主要可以分为以下几点:

  • MapReduce计算框架/流程:Map、Reduce和Shuffle
  • MapReduce的Shuffle细节:Combiner、Partitioner
  • Hadoop Streaming介绍及参数配置

本文重点介绍MapReduce计算框架的工作流程:Map、Reduce和Shuffle。


编者按  由于MR的详细理论过于复杂,且细节较多,本文肯定有错误之处,如果在阅读中发现有错误内容欢迎私信骚扰。

1 MapReduce概述

Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。

1.1 MapReduce计算框架

一个Map/Reduce _作业(job)_通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们。框架会对map的输出先进行排序, 然后把结果输入给_reduce任务。通常作业的输入和输出都会被存储在文件系统中。整个框架负责任务的调度和监控,以及重新执行已经失败的任务。

Map/Reduce框架由一个单独的master JobTracker 和每个集群节点一个slave TaskTracker共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。

应用程序至少应该指明输入/输出的位置(路径),并通过实现合适的接口或抽象类提供map和reduce函数。再加上其他作业的参数,就构成了_作业配置(job configuration)_。然后,Hadoop的 _job client_提交作业(jar包/可执行程序等)和配置信息给JobTracker,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。

Note  虽然Hadoop框架是用Java实现的,但是Map/Reduce应用程序不一定需要使用Java编写。如Hadoop Streaming就是一种运行作业的实用工具,它允许用户创建和运行任何可执行程序 (如Shell, python)来实现mapper和reducer。

下面借助下图解释一下Hadoop计算框架中的各个部分:

  • Client:即普通用户,可以提交任务、查看任务
  • JobTracker:有且只有一个,就像一个主机负责初始化作业、分配作业、与TaskTracker通信、协调整个作业的执行等等。
  • TaskTracker:负责执行保持与JobTracker的通信,在分配的数据片段上执行Map或Reduce任务。TaskTracker 的责任是发送进度报告到JobTracker,周期性地发送“心跳”信号信息给 JobTracker 以便通知系统它的当前状态。在任务失败的情况下,JobTracker 可以在不同的 TaskTracker 重新调度它。

另外  除了上面的几个部分还有TaskScheduler,顾名思义就是MapReduce中的任务调度器。在MapReduce中,JobTracker接收JobClient提交的Job,将它们按 InputFormat的划分以及其他相关配置,生成若干个Map和Reduce任务。然后,当一个TaskTracker通过心跳告知JobTracker自己还有空闲的任务Slot时,JobTracker就会向其分派任务。具体应该分派一些什么样的任务给这台TaskTracker,这就是TaskScheduler所需要考虑的事情。

1.2 MapReduce计算流程

MapReduce计算模型主要由三个阶段构成:Map、Shuffle和Reduce。其中Shuffle不需要我们操作,框架已实现,但了解shuffle的具体流程能帮助我们编写出更加高效的Mapreduce程序。Map是映射过程,负责将原始数据转化为键值对;Reduce是合并过程,将具有相同key值的value进行处理后再输出新的键值对作为最终结果;为了让Reduce可以并行处理Map的结果,必须对Map的输出进行一定的排序与分割,然后再交给对应的Reduce,这个过程就是Shuffle。

官方给出的MR过程中shuffle&sort过程如下图所示,这个图基本解释清楚了shuffle的流程,但是对于本来就不懂shuffle的人来说还是有点难以理解,在本文后面的内容以及下一篇文章中我将尽可能的解释清楚shuffle细节。Hadoop[3] MapReduce理论详解


上图中只涉及到了一个map和一个reduce,可能无法看清全貌,下图从全局的角度解释MapReduce的工作流程。Hadoop[3] MapReduce理论详解

从上面两幅图可以得到如下信息:

  • 一个完整的MR流程应该包括:从HDFS读取文件 - 切片split - map - [partition - spill - combine - sort - copy - merge] - reduce - 写入HDFS;(其中[]中的内容是shuffle部分,而且可变)
  • shuffle阶段是一个非常复杂的过程;

关于map和reduce的工作内容很直观了,之前的文章也解释过了,本文不再细述,下文重点解释一下shuffle。

2 Shuffle概述

虽说shuffle由框架实现,但了解shuffle的具体流程能帮助我们编写出更加高效的Mapreduce程序,比如可以根据业务情景考虑是否能够对Combiner,Partitioner进行优化,这部分内容将会在下一篇文章中介绍。

Shuffle有洗牌的意思,在MR里面可以理解为将map端的无规则输出按指定的规则整理成具有一定规则的数据,以便reduce端接收处理。其在MapReduce中所处的工作阶段是map输出后到reduce接收前,具体可以分为map端「map shuffle」和reduce端「reduce shuffle」两个部分。

在shuffle之前,也就是在map阶段,MapReduce会对要处理的数据进行分片(split)操作,为每一个分片分配一个MapTask任务。接下来map()函数会对每一个分片中的每一行数据进行处理得到键值对(key,value),此时得到的键值对又叫做“中间结果”,shuffle阶段的作用是处理“中间结果”,下图形象地展示了shuffle阶段的工作内容(图片来自网络):Hadoop[3] MapReduce理论详解由于直接解释map和reduce的内容没太大意义,而且不利于把握MR的整体工作流程,下面结合shuffle介绍map和reduce的内容。

下面先完整的解释一遍shuffle流程,本段内容摘自网络。

因为频繁的磁盘I/O操作会严重的降低效率,因此“中间结果”不会立马写入磁盘,而是优先存储到map节点的“环形内存缓冲区”,在写入的过程中进行分区(partition),也就是对于每个键值对来说,都增加了一个partition属性值,然后连同键值对一起序列化成字节数组写入到缓冲区(缓冲区采用的就是字节数组,默认大小为100M)。当写入的数据量达到预先设置的阙值后(mapreduce.map.io.sort.spill.percent,默认0.80,或者80%)便会启动溢写出线程将缓冲区中的那部分数据溢出写(spill)到磁盘的临时文件中,并在写入前根据key进行排序(sort)和合并(combine,可选操作)。溢出写过程按轮询方式将缓冲区中的内容写到mapreduce.cluster.local.dir属性指定的目录中。当整个map任务完成溢出写后,会对磁盘中这个map任务产生的所有临时文件(spill文件)进行归并(merge)操作生成最终的正式输出文件,此时的归并是将所有spill文件中的相同partition合并到一起,并对各个partition中的数据再进行一次排序(sort),生成key和对应的value-list,文件归并时,如果溢写文件数量超过参数min.num.spills.for.combine的值(默认为3)时,可以再次进行合并。至此,map端shuffle过程结束,接下来等待reduce task来拉取数据。对于reduce端的shuffle过程来说,reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge最后合并成一个分区相同的大文件,然后对这个文件中的键值对按照key进行sort排序,排好序之后紧接着进行分组,分组完成后才将整个文件交给reduce task处理。

3 Map shuffle

介绍几个可能会用到的术语:

  • block:分区,block是HDFS中的基本存储单位,hadoop1.x默认大小为64M而hadoop2.x默认块大小为128M;block是物理划分;
  • spilt:切片,在MapReduce中,map task只读取split,那么什么是split呢?可以将其理解为一个或多个block,split与block的对应关系可能是一对一,一对多。默认情况下,分片的大小就是HDFS的blockSize(128M);split是逻辑划分;
  • partition:分区,由于在reduce阶段,相同key值的键值对会被分发给同一个reducer,而且要尽可能地保持各个reducer的负载均衡,如何实现呢?这就是partitioner的功能了,partitioner会将key映射到不同的reducer,即不同的分区(分区数等于reducer的数量);
  • combine:翻译为合并吧,其功能是在map shuffle阶段先对map中的局部数据做reduce以减少网络传输的压力,但并不是所有场景都能使用combiner。

Map端的shuffle过程是对Map的结果进行分区、排序、分割,然后将属于同一划分(分区)的输出合并在一起并写在磁盘上,最终得到一个分区有序的文件,分区有序的含义是map输出的键值对按分区进行排列,具有相同partition值的键值对存储在一起,每个分区里面的键值对又按key值进行升序排列(默认)。

map shuffle的工作流程

  1. 每个map从split读取数据,并输出键值对;
  2. 对于map输出的每一个键值对,系统都会给定一个partition,partition值默认是通过计算key的hash值后对Reduce task的数量取模获得。如果一个键值对的partition值为1,意味着这个键值对会交给第一个Reducer处理;
  3. 接下来,需要将数据写入内存缓冲区中(这是一个环形buffer),缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区,缓冲区默认大小为100M;
  4. 当缓冲区中的数据超过溢出值(默认情况下为0.8),即缓冲区中的数据量超过缓冲区大小的80%就需要将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。当溢写线程启动后,需要对这80MB空间内的key做排序(Sort);
  5. 每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。当map task全部完成时,内存缓冲区中的数据也全部溢写到磁盘中形成一个溢写文件。
  6. 当整个map任务完成溢出写后,会对磁盘中这个map任务产生的所有临时文件(spill文件)进行归并(merge)操作生成最终的正式输出文件,此时的归并是将所有spill文件中的相同partition合并到一起,并对各个partition中的数据再进行一次排序(sort),生成key和对应的value-list,文件归并时,如果溢写文件数量较多,可以进行多次归并。
  7. 至此,map端shuffle过程结束,接下来等待reduce task来拉取数据。

3 Reduce shuffle

reduce端的shuffle过程相对比较简单,reduce task在执行之前的工作就是从各个map任务中提取自己的那一部分数据(对应的partition),然后对从不同地方拉取过来的数据不断地做merge最后合并成一个大文件。如果待归并的文件过多,也会存在多次归并的情况,如下图所示是50个map输出文件,合并因子配置为10的情况,需要5轮的合并:

注意  数据被reduce读取完成之后,map机器不会立刻删除数据,这是为了预防reduce任务失败需要重做,因此map输出数据是在整个作业完成之后才被删除掉的。

Hadoop[3] MapReduce理论详解一般来说上面的copy&merge阶段完成之后就会输出一个整体有序的数据块,之后将整个文件交给reduce task处理,reduce将处理完的结果写入HDFS。

5 MapReduce实例

以wordCount为例,整个input-split-map-shuffle-reduce-output的过程如下图所示(图片来自网络)。

再来一张好图助你理解MR整个过程

参考链接

  • https://blog.csdn.net/qq_33429968/article/details/76944005
  • http://hadoop.apache.org/docs/r1.0.4/cn/mapred_tutorial.html
  • https://blog.csdn.net/mmake1994/article/details/87692275
  • https://www.cnblogs.com/noures/archive/2012/07/13/2590246.html
  • https://blog.csdn.net/ASN_forever/article/details/81233547
  • https://blog.csdn.net/u014374284/article/details/49205885
  • https://blog.csdn.net/bingduanlbd/article/details/51933914


以上是关于Hadoop[3] MapReduce理论详解的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop集群(第6期)_WordCount运行详解

原创 Hadoop&Spark 动手实践 3Hadoop2.7.3 MapReduce理论与动手实践

hadoop2-MapReduce详解

Hadoop MapReduce 一文详解MapReduce及工作机制

Hadoop学习之路(二十三)MapReduce中的shuffle详解

大数据之Hadoop(MapReduce): MapReduce框架原理