MapReduce工作机制
Posted helloemk
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce工作机制相关的知识,希望对你有一定的参考价值。
1、MapTask机制:
(1)Read 阶段:客户端获取输入数据信息,根据配置文件形成一个任务分配规划(形成InputSplit),然后submit()方法提交job。AppMaster通过用户编写的 RecordReader,从InputSplit中用InputFormat(分为TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义的InputFormat)解析(默认用的TextInputFormat)出Key-value键值对。
(2)map阶段:该节点主要是将解析出的 key/value 交给用户编写 map() 函数处理,并产生一系列新的 key/value。
(3)Collect 收集阶段:在用户编写 map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的 key/value 分区(调用Partitioner),并写入一个环形内存缓冲区中。
(4)Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
溢写阶段详情:
步骤 1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号partition 进行排序,然后按照 key 进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照 key 有序。
步骤 2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件 output/spillN.out(N 表示当前溢写次数)中。如果用户设置了 Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
步骤 3:将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过 1MB,则将内存索引写到文件 output/spillN.out.index 中。
(5)Combine 阶段:当所有数据处理完成后,MapTask 会将所有临时文件合并成一个大文件,并保存到文件output/file.out 中,同时生成相应的索引文件 output/file.out.index,以确保最终只会生成一个数据文件。(让每个 MapTask 最 终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销)。在进行文件合并过程中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并 io.sort.factor(默认 100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
2、Shuffle机制:
(1)Shuffle:Mapreduce 确保每个 reducer 的输入都是按键排序的。系统执行排序的过程(即将 map输出作为输入传给 reducer)称为 shuffle。
(2)Partition 分区:默认 partition 分区方法(HashPartitioner()是根据 key 的 hashCode 对 reduceTasks 个数取模得到的。用户没法控制哪个key 存储到哪个分区)。
自定义partitioner步骤:
1)自定义类继承 Partitioner,重写 getPartition()方法。设置分区个数,并根绝业务要求分到不同的partition。
2)在 job 驱动中,设置自定义 partitioner:job.setPartitionerClass(CustomPartitioner.class);
3)自定义 partition 后,要根据自定义 partitioner 的逻辑设置相应数量的 reduce task:job.setNumReduceTasks(5);
注意:如果 reduceTask 的数量> getPartition 的结果数,则会多产生几个空的输出文件;如果 1<reduceTask 的数量<getPartition 的结果数,会报错;如果 reduceTask 的数量=1,则不管 mapTask 端输出多少个分区文件分区文件,最终结果都交给这一个 reduceTask,最终也就只会产生一个结果文件 part-r-00000;
(3)WritableComparable 排序:Map Task 和 Reduce Task 均会对数据(按照 key的字典顺序,排序方法为快速排序)进行排序。
1)排序的分类
1)部分排序:MapReduce 根据输入记录的键对数据集排序。保证输出的每个文件内部排序。(MapTask只是对缓冲区的数据进行排序,所以是部分排序)
2)全排序:实现全排序 方法1:只需要设置一个分区就可以(同一个分区进行一次排序),但是在处理大文件时,只用一个分区效率极低,所以不可行。
方法2:首先创建一系列排好序的文件,然后串联这些文件,最后按顺序生成一个全排序的文件。(a-g:文件1,h-s:文件2,t-z:文件3,最后依次将1,2,3文件中的数据读取到一个文件中,实现全排序)。
3)辅助排序:GroupingComparator 分组
Mapreduce 框架在记录到达 reducer 之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的 map 任务且这些 map 任务在不同轮次中完成时间各不相同。一般来说,大多数 MapReduce 程序会避免让 reduce 函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。
4)二次排序:在自定义排序过程中,如果 compareTo 中的判断条件为两个即为二次排序。
2)自定义排序 WritableComparable:bean 对象实现 WritableComparable 接口重写 compareTo 方法,就可以实现排序。
(4)GroupingComparator 分组 (辅助排序):对 reduce 阶段的数据根据某一个或几个字段进行分组。
(5) Combiner 合并:combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件,父类是Reducer(区别在于运行位置不一样:Combiner运行于每一个maptask的节点,Reducer接收全局map阶段的输出结果)。
combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量。
combiner 能够应用的前提是不能影响最终的业务逻辑,而且,combiner 的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来。
自定义 Combiner 实现步骤:
(1)自定义一个 combiner 继承 Reducer,重写 reduce 方法
(2)在 job 驱动类中设置:job.setCombinerClass(WordcountCombiner.class);
3、ReduceTask 工作机制
(1)设置 ReduceTask 并行度(个数):reducetask 的并行度同样影响整个 job 的执行并发度和执行效率,但与 maptask 的并发数由切片数决定不同,Reducetask 数量的决定是可以直接手动设置:job.setNumReduceTasks(4);
(2)注意:
1)reducetask=0 ,表示没有 reduce 阶段,输出文件个数和 map 个数一致。
2)reducetask 默认值就是 1,所以输出文件个数为一个。
3)如果数据分布不均匀,就有可能在 reduce 阶段产生数据倾斜。
4)reducetask 数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有 1 个 reducetask。
5)具体多少个 reducetask,需要根据集群性能而定。
6)如果分区数不是 1,但是 reducetask 为 1,是否执行分区过程。答案是:不执行分区过程。因为在 maptask 的源码中,执行分区的前提是先判断 reduceNum 个数是否大于 1。不大于 1 肯定不执行。
(3)ReduceTask 工作机制
1)Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
2)Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
3)Sort 阶段:按照 MapReduce 语义,用户编写 reduce()函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。
4)Reduce 阶段:reduce()函数将计算结果写到 HDFS 上。
4、OutputFormat 数据输出
(1)OutputFormat 接口:OutputFormat 是 MapReduce 输出的基类,所有实现 MapReduce 输出都实现了OutputFormat 接口。常见的 OutputFormat 实现类:
1)文本输出 TextOutputFormat:默认的输出格式是 TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为 TextOutputFormat 调用 toString()方法把它们转换为字符串。
2)SequenceFileOutputFormat:SequenceFileOutputFormat 将它的输出写为一个顺序文件。如果输出需要作为后续MapReduce 任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。
3)自定义 OutputFormat:根据用户需求,自定义实现输出。
自定义 OutputFormat 步骤:(1)自定义一个类继承 FileOutputFormat。(2)改写 recordwriter,具体改写输出数据的方法 write()。
5、Join 多种应用
(1)Reduce join
1) 原理:Map 端的主要工作:为来自不同表(文件)的 key/value 对打标签以区别不同来源的记录。然后用连接字段作为 key,其余部分和新加的标志作为 value,最后进行输出。Reduce 端的主要工作:在 reduce 端以连接字段作为 key 的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在 map 阶段已经打标志)分开,最后进行合并就 ok 了。
2 )该方法 的缺点:这种方式的缺点很明显就是会造成 map 和 reduce 端也就是 shuffle 阶段出现大量的数据传输,效率很低。
3)实例操作:reduce 端表合并(数据倾斜)
(2)Map join( ( Distributedcache 分布式缓存 )
1)使用场景:一张表十分小、一张表很大。
2)解决方案:在 map 端缓存多张表,提前处理业务逻辑,这样增加 map 端业务,减少 reduce 端数据的压力,尽可能的减少数据倾斜。
3)具体办法:采用 distributedcache
(1)在 mapper 的 setup 阶段,将文件读取到缓存集合中。
(2)在驱动函数中加载缓存。job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 缓存普通文件到 task 运行节点。
4)实操案例:map 端表合并(Distributedcache)
6、数据清洗(ETL)
(1)概述:在运行核心业务 Mapreduce 程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行 mapper 程序,不需要运行 reduce 程序。
(2)实例:日志清洗(数据清洗)
7、数据压缩
(1)鉴于磁盘 I/O 和网络带宽是 Hadoop 的宝贵资源,数据压缩对于节省资源、最小化磁盘I/O 和网络传输非常有帮助,但是会占用cpu计算能力
(2)运算密集型的 job,少用压缩;IO 密集型的 job,多用压缩
(3)压缩方式选择:
1 )Gzip 压缩:
优点:压缩率比较高,而且压缩/解压速度也比较快;hadoop 本身支持,在应用中处理gzip 格式的文件就和直接处理文本一样;大部分 linux 系统都自带 gzip 命令,使用方便。
缺点:不支持 split。
应用场景:当每个文件压缩之后在 130M 以内的(1 个块大小内),都可以考虑用 gzip压缩格式。例如说一天或者一个小时的日志压缩成一个 gzip 文件,运行 mapreduce 程序的时候通过多个 gzip 文件达到并发。hive 程序,streaming 程序,和 java 写的 mapreduce 程序完全和文本处理一样,压缩之后原来的程序不需要做任何修改。
2) Bzip2 压缩
优点:支持 split;具有很高的压缩率,比 gzip 压缩率都高;hadoop 本身支持,但不支持 native;在 linux 系统下自带 bzip2 命令,使用方便。
缺点:压缩/解压速度慢;不支持 native。
应用场景:适合对速度要求不高,但需要较高的压缩率的时候,可以作为 mapreduce 作业的输出格式;或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况;或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持 split,而且兼容之前的应用程序(即应用程序不需要修改)的情况。
3 )Lzo 压缩
优点:压缩/解压速度也比较快,合理的压缩率;支持 split,是 hadoop 中最流行的压缩格式;可以在 linux 系统下安装 lzop 命令,使用方便。
缺点:压缩率比 gzip 要低一些;hadoop 本身不支持,需要安装;在应用中对 lzo 格式的文件需要做一些特殊处理(为了支持 split 需要建索引,还需要指定 inputformat 为 lzo 格式)。
应用场景:一个很大的文本文件,压缩之后还大于 200M 以上的可以考虑,而且单个文件越大,lzo 优点越越明显。
4 )Snappy 压缩
优点:高速压缩速度和合理的压缩率。
缺点:不支持 split;压缩率比 gzip 要低;hadoop 本身不支持,需要安装;
应用场景:当 Mapreduce 作业的 Map 输出的数据比较大的时候,作为 Map 到 Reduce的中间数据的压缩格式;或者作为一个 Mapreduce 作业的输出和另外一个 Mapreduce 作业的输入。
(4)压缩位置的选择
1)输入端采用压缩:在有大量数据并计划重复处理的情况下,应该考虑对输入进行压缩。然而,你无须显示指定 使 用 的 编 解 码 方 式 。Hadoop自动检查文件扩展名,如果扩展名能够匹配,就会用恰当的编解码方式对文件进行压缩和解压。否则,Hadoop就不会使用任何编解码器。
2)map端输出采用压缩:当map任务输出的中间数据量很大时,应考虑在此阶段采用压缩技 术。 这能显 著改 善内 部数 据Shuffle 过 程 , 而 Shuffle 过 程 在Hadoop处理过程中是资源消耗最多的环节。如果发现数据量大造成网络传输缓慢,应该考虑使用压缩技术。可用于压缩mapper输出的快速编解码器包括LZO或者Snappy。
注意:LZO是供Hadoop压缩数据用的通用压缩编解码器。其设计目标是达到与硬盘读取速度相当的压缩速度,因此速度是优先考虑的因素,而不是压缩率。与gzip编解码器相比,它的压缩速度是gzip的5倍,而解压速度是gzip的倍。同一个文件用LZO压缩后比用gzip压缩后大50%,但比压缩前小25%~50%。这对改善性能非常有利,map阶段完成时间快4倍。
3)reduce端输出采用压缩:在此阶段启用压缩技术能够减少要存储的数据量,因此降低所需的磁盘空间。当mapreduce作业形成作业链条时,因为第二个作业的输入也已压缩,所以启用压缩同样有效。
(5)压缩配置 参数
1)io.compression.codecs(在core-site.xml中配置)阶段:输入压缩 作用:Hadoop 使用文件扩展名判断是否支持某种编解码器
2)mapreduce.map.output.compress (在 mapred-site.xml中配置) 阶段:mapper输出 默认:false 作用:设为true是为mapper开启压缩。
3)mapreduce.map.output.compress.codec ( 在mapred-site.xml 中配置) 阶段:mapper输出 作用:使 用LZO 或snappy编解码器在此阶段压缩数据
4)mapreduce.output.fileoutputformat.compress ( 在mapred-site.xml 中配置) 阶段:reduce输出 默认:false 作用:设为true开启reducer端压缩
5)mapreduce.output.fileoutputformat.compress.codec (在mapred-site.xml 中配置) 阶段:reduce输出 作用:使用标准工具或者编解 码器,如gzip 和bzip2
6)mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml 中配置) 阶段:reduce输出 作用:SequenceFile输出使用的压缩 类型 :NONE和BLOCK
以上是关于MapReduce工作机制的主要内容,如果未能解决你的问题,请参考以下文章
MapReduce 框架原理MapReduce 工作流程 & Shuffle 机制
大数据之Hadoop(MapReduce):shuffle之ReduceTask工作机制
Hadoop--09---MapReduce_04----MapReduce工作流程Shuffle 机制Partition 分区