Hadoop学习--MapReduce流程详解
Posted 是渣渣呀
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop学习--MapReduce流程详解相关的知识,希望对你有一定的参考价值。
详细流程图
(from:尚硅谷)
大致流程
注:
- 在mapper类被调用之前的活动都是由 InputFormat的类型来决定具体的执行策略的!
1.InputSplit切片阶段
- InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit并没有对文件实际的切割,只是记录了要处理的数据的位置(包括文件的path和hosts)和长度(由start和length决定),默认情况下与block一样大。
- split的个数决定了MapTask的个数!!
根据InputFormat的类型进行切片(比如默认的TextInputFormat,CombineTextInputFormat等)
- TextInputFormat切片机制:
- 简单地按照文件的内容长度进行切片
- 切片大小,默认等于Block大小
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
举个例子:
file1.txt 320M
file2.txt 10M
经过FileInputFormat的切片机制运算后,形成的切片信息如下:
file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~320
file2.txt.split1-- 0~10M
- CombineTextInputFormat切片机制:
- 虚拟存储切片最大值设置: CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值 - 判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切
片。 - 如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片
- 虚拟存储切片最大值设置: CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
2.RecordReader阶段
无论我们以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类;
- 系统默认的RecordReader是LineRecordReader,TextInputFormat;LineRecordReader是用每行的偏移量作为map的key,每行的内容作为map的value;
- SequenceFileInputFormat的RecordReader是SequenceFileRecordReader;
应用场景:自定义读取每一条记录的方式;自定义读入key的类型,如希望读取的key是文件的路径或名字而不是该行在文件中的偏移量。
RecorderReader是一个接口,主要是用来读取文件的输入键值对的,我们也可以自定义输入的key,value对的读取规则。属于split和mapper之间的一个过程,将inputsplit输出的一个记录,转换成为key-value的记录形式提供给mapper
3.Map阶段
经过RecorderReader后,Mapper任务会接收输入的分片,然后不断的调用map()方法,对记录进行处理,处理完毕后,转换为新的<key,value>输出(其中每一个分片对应一个map,一个map方法可以被调用多次来处理分片)
简单描述一下mapper类的结构
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
/*
1. LongWritable, Text 是切片经过RecordReader后的key,value对
2. Text, IntWritable 是map后输出的key,value类型
*/
Text k = new Text();
IntWritable v = new IntWritable(1);
/*
1. LongWritable key, Text value 切片经过RecordReader后的key,value对,这里是默认的LineRecordReader,所以就是每行调用一次map
2. 我们只需要对每行的内容value进行处理即可,不必关心key!
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
// 2 切割
String[] words = line.split(" ");
// 3 输出
for (String word : words) {
k.set(word);
context.write(k, v);
}
}
}
4.Shuffle阶段
上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,具体Shuffle过程详解,如下:
将map的输出作为输入传给reducer 称为shuffle
4.1 mapper输出写入缓存,并溢写到本地磁盘
(1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中
(2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
溢写(spill):
Map端会处理输入数据并产生中间结果,这个中间结果会写到本地磁盘,而不是HDFS。每个Map的输出会先写到内存缓冲区中, 缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。 当写入的数据达到设定的阈值时,系统将会启动一个线程将缓冲区的数据写到磁盘,这个过程叫做spill!
溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,当内存中buffer in memory(内存)当达到阈值,(默认的80%),就会把记录溢写到磁盘文件中,然后用剩余的20%来继续接受数据
注意:
(1)Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的
次数越少,执行速度就越快。
(2)缓冲区的大小可以通过参数调整,参数:io.sort.mb默认100M。
(3)多个溢出文件会被合并成大的溢出文件
(4)==在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序!==
4.2 分区Partitioner
Partitioner的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理,目是将记录划分到不同的Reducer上去,以期望能够达到负载均衡,以后的Reducer就会根据partition(一个int类型的值)来读取自己对应的数据。
注:默认是HashPartitioner
此hash采用hash(key.hashcode()& Integer.MAX_VALUE)% munReduceTasks 有多少个分区就会有多少个reducetask
默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力。
4.3 排序sort
在spill写入之前,会先进行二次排序,首先根据数据所属的partition进行排序,然后每个partition中的数据再按key来排序。
注:要想自定义key的排序规则,要让key的类实现WritableComparable接口并重写compareTo方法才行。
4.4 combine(可选)
注:可以看到,其实在combine之前有过一次MergeSort的操作,即将map的输出里相同key值的value归并到一个集合中。
通俗易懂的说,例如有两个键值对 <“a”,1> 和 <“a”,1>,
如果合并,会得到<“a”,2>,
如果归并,会得到<“a”,<1,1>>。
再以一个自定义的combiner为例
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 1 汇总操作
int count = 0;
for (IntWritable v : values) {
count += v.get();
}
// 2 写出
context.write(key, new IntWritable(count));
}
可以看到,combiner是和reducer一样的有Iterable values这个参数,就是因为相同key值的value已经被归并到一个集合中了!
combine就是规约操作,其本质也是一个Reducer,其目的是对将要写入到磁盘上的文件先进行一次处理,这样,写入到磁盘的数据量就会减少,通过对map输出的数量进行规约,可以减少reduce的数量,提高执行效率。combiner的输入输出类型必须和mapper的输出以及reducer的输入类型一致
注:combiner是对每个maptask(分区)里的数据进行操作,所以不能用来取平均数,但可以用来取最值,而后面的reducer是取所有的数据!!!
4.5 归并排序
百度百科:
归并排序(MERGE-SORT)是建立在归并操作上的一种有效的排序算法,该算法是采用分治法(Divide and Conquer)的一个非常典型的应用。将已有序的子序列合并,得到完全有序的序列;即先使每个子序列有序,再使子序列段间有序。若将两个有序表合并成一个有序表,称为二路归并。归并排序是一种稳定的排序方法。
目前,我的理解就是执行MergeSort,把键值对组中许多个key相同的键值对的value放到一个集合里,然后并为一个键值对里的key,values(所以之后reducer里的类型为Iterable<?> values)
过渡
MRAppMaster
是MapReduce的ApplicationMaster实现,它使得Map
Reduce可以直接运行在YARN上,它主要作用在于管理作业的生命周期
-
MRAppMaster在所有MapTask任务完成后,启动相数量的ReduceTask,并告知reduceTask处理数据范围(数据分区)
-
ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
-
ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进
行合并(归并排序) -
合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程
5.Reduce阶段
从文件中取出一个一个的键值对Group,对已排序输出的每个键调用reduce函数。此阶段的输出直接写到输出文件系统,一般为HDFS以实现可靠存储。
对reducer类进行一些简单地说明(注释)
public class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
/*
1. FlowBean, Text 指的是map输出的key,value类型
2. Text, FlowBean 指的是reduce输出的key,value类型
*/
/*
FlowBean key, Iterable<Text> values
1. FlowBean key指的是map输出的key
2. Iterable<Text> values是对同一个key归并排序之后的value
*/
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 循环输出,避免总流量相同情况
for (Text text : values) {
context.write(text, key);
}
}
}
以上是大数据小白对mapreduce流程大致的理解,可能会有错误或者不全面的地方 …^ _ ^
以上是关于Hadoop学习--MapReduce流程详解的主要内容,如果未能解决你的问题,请参考以下文章
Hadoop学习之路(二十三)MapReduce中的shuffle详解
大数据之Hadoop(MapReduce): MapReduce框架原理
大数据技术之_05_Hadoop学习_02_MapReduce_MapReduce框架原理+InputFormat数据输入+MapReduce工作流程(面试重点)+Shuffle机制(面试重点)(示例