MapReduce的执行流程及优化

Posted JeRomeee

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce的执行流程及优化相关的知识,希望对你有一定的参考价值。

MapReduce的执行流程及优化

执行流程:

  1. 程序在调用submit()方法之前,会生成一份任务分配规划。
  2. 提交数据到RecourseManager(RM),包含一份job.split 文件和job.xml文件和jar包
  3. RM根据提交的资源文件,分配任务,根据切片的个数启动相应的mapTask任务个数。
  4. mapTask任务根据程序设置的文件读取类去读文件数据,比如TextFileInputFormat,底层是调用RecorderReader组件的reader()方法去读取数据。
  5. 数据读取进入map方法,执行我们的业务逻辑,最后写出。
  6. 数据写出时,先经过分区方法,标记好数据所属的分区,再进入环形缓冲区。
  7. 当环形缓冲区的数据量大小达到阈值(默认80%)时,数据溢写到磁盘文件。
  8. 在溢写之前,会根据按照字典顺序对key进行快速排序。
  9. 当分区溢写到磁盘的文件数量达到默认的10个时,会进行merge操作,把10个文件合并为1个。在合并的过程中会做归并排序。
  10. 如果配置了combiner,则在归并之前执行combiner合并。
  11. 当所有的mapTask任务执行完成,则根据分区个数启动对应数量的reduceTask任务。
  12. reduceTask任务按照分区从mapTask中拉去对应的分区数据。
  13. reduceTask拉去的分区数据,先进入内存中存储,当内存不足时,溢写到磁盘文件。
  14. 当该reduceTask拉取完数据后,会进行合并文件,归并排序,分组(如果有combiner)。
  15. 数据进入reduce方法,执行业务逻辑。
  16. 最后数据通过RecorderWirterwirte方法写出到文件系统等。

关于combiner的详细描述:点击打开链接

优化:

mr程序主要是基于磁盘的,效率比较低,当我们需要执行mr程序时,有很多需要调优的地方。

  1. mr处理的数据上来观察,如果处理的数据存在大量小文件,可能会启动大量的mapTask任务,这样会频繁的抢占集群资源。可以使用ConbineFileInputFormat来读取数据。
  2. map:增大环形缓冲区大小,由原来的默认100M调大到200M;增大溢写的阈值,由默认的80%调大到90%;增大合并的文件个数,由默认的10个执行合并,增大为20个;如果在不影响最终结果的情况下,设置conbiner,减少数据量,减少io。
  3. reduce:增大reduce端内存大小,减少溢写磁盘次数;合理设置reduce个数;增大reduce拉取数据的并行数;如果可以的情况,规避使用reduce端。
  4. 宏观上:调大mapreduce任务的内存,默认1G调大到4G(默认数据块是128M,这么大的数据块用1G基本上可以处理,但是如果文件再大的话我们可以调整内存。);增大mapreduce任务的cpu数;增加每个container的内存和cpu大小;调整mapTaskreduceTask的最大重试次数。
  5. 压缩:在map端输出数据时,可以使用压缩,减少磁盘I/O,这个地方需要速度快的压缩方式,不需要分片,可以采用snappy压缩;对应链式的mr程序,那么在reduce端输出数据时,可以考虑使用bizp2lzo压缩。

[Hadoop]浅谈MapReduce原理及执行流程

MapReduce


 

  • MapReduce原理非常重要,hive与spark都是基于MR原理
  • MapReduce采用多进程,方便对每个任务资源控制和调配,但是进程消耗更多的启动时间,因此MR时效性不高。适合批量,高吞吐的数据处理。Spark采用的是多线程模型。

MapReduce执行流程

技术分享图片

Map过程

  • map函数开始产生输出时,并不是直接将数据写到磁盘,它利用缓冲的方式写到内存。每个map任务都有一个环形内存缓冲区用于存储任务输出。在默认情况下,缓冲区大小为100MB。一旦缓冲内容达到阈值(默认80%),便把数据溢出(spill)到磁盘。

Partition过程

  • 在map输出数据写入磁盘之前,线程首先根据数据最终要传的reducer把数据划分成相应的分区,这个过程即为partition。

技术分享图片

传统hash算法
  • hash()%max 括号内随机取数,这样会随机分配到1-max服务器上
一致性hash算法

技术分享图片

  • 一致性哈希算法的优点:形成动态闭环调节,如果有一台服务器出现问题,例如图中B服务器出现问题,A和C可以代替其承担。

Partition的作用

  • 对于spill出的数据进行哈希取模,原来数据形式(key, value),取模后变成(partition,key, value)
  • reduce有几个partition就有几个
  • 在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,比如按照省份划分的话,需要把同一省份的数据放到一个文件中;按照性别划分的话,需要把同一性别的数据放到一个文件中。我们知道最终的输出数据是来自于Reducer任务。那么,如果要得到多个文件,意味着有同样数量的Reducer任务在运行。Reducer任务的数据来自于Mapper任务,也就说Mapper任务要划分数据,对于不同的数据分配给不同的Reducer任务运行。Mapper任务划分数据的过程就称作Partition。负责实现划分数据的类称作Partitioner。

HDFS中block

  • 文件存储在HDFS中,每个文件切分成多个一定大小(默认64M)的block(默认3个备份)存储在多个节点(DataNode)上
  • block的修改:hdfs-site.xml配置文件中修改dfs.block.size的值

Shuflle

  • shuffle是MapReduce的“心脏”,是奇迹发生的地方
  • Shuflle包括很多环节:partition sort spill meger combiner copy memery disk

以上是关于MapReduce的执行流程及优化的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop MapReduce介绍官方示例及执行流程Apache Hadoop概述

MapReduce的原理及执行过程

Hadoop MapReduce介绍官方示例及执行流程

MapReduce处理流程

Hive2优化参数

mapreduce工作流程