MapReduce ——ReduceTask阶段源码分析
Posted 三秋叶
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce ——ReduceTask阶段源码分析相关的知识,希望对你有一定的参考价值。
对ReduceTask最宏观的理解也应该包括三个阶段:数据输入(input),数据计算(reduce),数据输出(output)
下边代码是大数据开发hello world
Reduce方法:
public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private int sum;
private IntWritable total = new IntWritable();
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
total.set(sum);
context.write(key,total);
}
}
自定义的WcReducer
继承了Reducer
类,进入Reducer
类中查看具体字段和方法,类头上的注释简单翻译一下并且加上自己的理解
将共享一个键的一组中间值减少到一组较小的值。
Reducer实现可以通过JobContext.getConfiguration()方法访问作业的Configuration
Reducer有 3 个主要阶段:
Shuffle
Reducer使拉取每个Mapper输出的数据,相同的key要被拉取到一个分区Sort
Reducer将对输入进行按照key归并排序(因为不同的Mapper可能输出相同的key)
shuffle 和 sort 阶段同时发生,即在获取map输出时将小文件合并。在整个mapreduce阶段只有map端是从无序到有序的过程,用的是快速排序。reduce端没有能力再改变数据的顺序了二次排序 -- 分组比较器
要对值迭代器返回的值进行二次排序,应用程序应自定义继承key并定义分组比较器。
对整个分组key进行排序,但将使用分组比较器进行分组,以确定在同一个调用中发送哪些键和值到 reduce。
分组比较器通过Job.setGroupingComparatorClass(Class)指定,排序顺序由Job.setSortComparatorClass(Class) 指定。
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
public abstract class Context implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {}
protected void setup(Context context) throws IOException, InterruptedException {}
@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {}
public void run(Context context) throws IOException, InterruptedException {
// 1、根据上下文建立连接
setup(context);
try {
// 2、从上下文中取出数据
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
// 3、 释放连接
cleanup(context);
}
}
}
这里可以看出Mapper和Reducer 的 run方法有些不同:
-
Mapper::run():while (context.nextKeyValue()) {} map方法对每一条记录调用一次
-
Reducer::run():while (context.nextKey()) {} 而reduce方法是对每一组数据调用一次(相同的key为一组)
有了看MapTask的经验,进入ReduceTask
类 应该也有相应的run
方法。删除非核心代码,清清爽爽,开开心心读源码
public void run(JobConf job, final TaskUmbilicalProtocol umbilical){
if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
// start thread that will handle communication with parent
.........
// check if it is a cleanupJobTask
.........
// Initialize the codec
.........
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
shuffleConsumerPlugin.init(shuffleContext);
// 1、着重关注这个返回值
// shuffle 过程已经把数据拉取回来了,这行代码的语义是【将拉回的数据包装成一个迭代器】
// 猜测跟迭代器设计模式有关
rIter = shuffleConsumerPlugin.run();
// free up the data structures
.........
// 2、比较器:如果没有自定义,则使用key默认的排序比较器
// 分组比较器,在map端有一个排序比较器
RawComparator comparator = job.getOutputValueGroupingComparator();
if (useNewApi) {
// 3、reduce方法开始运行
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
shuffleConsumerPlugin.close();
done(umbilical, reporter);
}
关于返回的迭代器,我们可以有这样的构思:如果我们拿到一个文件(file),接就可以将它打开(open),这样就可以将其中的一行数据读出(readLine),并且还可以判断是否有下一行数据(hasNext),并且可以获取下一行(next)。回过头来想:file在磁盘中,readLine在内存中,这样包装就可以实现在内存中操作磁盘上的文件了。大数据可能会撑爆内存,所以这个迭代器设计很巧妙。
分组比较器 可不可以复用排序比较器 ?
- 排序比较器的返回值:-1,0,1
- 分组比较器:返回值:布尔值 (T,F)
所以排序比较器可以做分组比较器 ~ ! 所以在MapTask和ReduceTask阶段可以有如下的选择。框架很灵活的兼容了我们定义的处理数据的方式.
- MapTask : 1、取用户定义的排序比较器;2、取Key自身的排序比较器 ;
- ReduceTask:1、取用户自定义的分组比较器;2、用户定义的排序比较器;3、取key自身的排序比较器
进入到runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass)
方法源码。还是一样的套路,重要的方法放在try-catch
块中。
private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewReducer(JobConf job,final TaskUmbilicalProtocol umbilical,final TaskReporter reporter,RawKeyValueIterator rIter,RawComparator<INKEY> comparator,
Class<INKEY> keyClass,Class<INVALUE> valueClass) throws IOException,InterruptedException,
ClassNotFoundException {
// wrap value iterator to report progress.
// 1、rawIter 这个迭代器就是数据输入
final RawKeyValueIterator rawIter = rIter;
rIter = new RawKeyValueIterator() {
public void close() throws IOException {
rawIter.close();
}
public DataInputBuffer getKey() throws IOException {
return rawIter.getKey();
}
public Progress getProgress() {
return rawIter.getProgress();
}
public DataInputBuffer getValue() throws IOException {
return rawIter.getValue();
}
public boolean next() throws IOException {
boolean ret = rawIter.next();
reporter.setProgress(rawIter.getProgress().getProgress());
return ret;
}
};
// make a task context so we can get the classes
// 2、创建任务上下文对象
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(), reporter);
// make a reducer
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW =
new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
job.setBoolean("mapred.skip.on", isSkipping());
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.Reducer.Context
// 3.创建reduce上下文。这里传入了迭代器(数据输入),迭代器被包装成了input
reducerContext = createReduceContext(reducer, job, getTaskID(),
rIter, reduceInputKeyCounter, reduceInputValueCounter,
trackedRW,committer,reporter, comparator, keyClass,valueClass);
try {
// 4.猜测调用真正reduce方法,最终调用到自定义的reduce方法
reducer.run(reducerContext);
} finally {
trackedRW.close(reducerContext);
}
}
再回到Reducer::run()
方法 , 在while循环中一直在调用一个context.nextKey()
,那么根据前面的源码分析可知,这个操作应该是迭代器在执行。判断有无下一条数据,然后把key和value取出来,
public void run(Context context) throws IOException, InterruptedException {
// 1、根据上下文建立连接
setup(context);
try {
// 2、从上下文中取出数据
while (context.nextKey()) {
// 3、注意到:context.getValues() 【后边会分析】
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
// 4、 释放连接
cleanup(context);
}
}
方法调用关系:Reducer::context.nextKey()
-> ReduceContextImpl::nextKey()
-> ReduceContextImpl::nextKeyValue()
// ----------------------ReduceContextImpl::nextKey() begin---------------------------
public boolean nextKey() throws IOException,InterruptedException {
// 1、有下一条记录,并且下一条的key和当前key相同,则读取下条k-v
// nextKeyIsSame 的默认值为false 所以不第一次调用不进入
while (hasMore && nextKeyIsSame) { // hasmore = input.next();
nextKeyValue();
}
if (hasMore) {
if (inputKeyCounter != null) {
inputKeyCounter.increment(1);
}
//2、再接着调用了nextKeyValue();
// 也就是说在调nextKey()时候会调用 nextKeyValue()
return nextKeyValue();
} else {
return false;
}
}
// ----------------------ReduceContextImpl::nextKey() end----------------------------
// ----------------------ReduceContextImpl::nextKeyValue() begin---------------------
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!hasMore) {
key = null;
value = null;
return false;
}
firstValue = !nextKeyIsSame;
// 1、获取key
DataInputBuffer nextKey = input.getKey();
currentRawKey.set(nextKey.getData(), nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition());
buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
key = keyDeserializer.deserialize(key);
//2、获取value
DataInputBuffer nextVal = input.getValue();
buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
- nextVal.getPosition());
value = valueDeserializer.deserialize(value);
currentKeyLength = nextKey.getLength() - nextKey.getPosition();
currentValueLength = nextVal.getLength() - nextVal.getPosition();
if (isMarked) {
backupStore.write(nextKey, nextVal);
}
//3、取出下条记录
hasMore = input.next();
if (hasMore) {
nextKey = input.getKey();
// 比较器:尝试比较 当前key 和 下一条数据的key 是否相同
// 下一条数据和当前数据是否为一组
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
currentRawKey.getLength(),
nextKey.getData(),
nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition()
) == 0;
} else {
nextKeyIsSame = false;
}
inputValueCounter.increment(1);
return true;
}
// ----------------------ReduceContextImpl::nextKeyValue() end-----------------------
// ----------------------ReduceContextImpl::getValues() begin------------------------
Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
// 注意:返回值是一个可迭代对象,这个对象包含一个迭代器。
//和之前的迭代器不一样,这是个嵌套迭代器
// 所以:在一次调用reduce方法中才可以对同组数据进行迭代遍历,hasnext判断的是本组还有没有数据
// 归根结底还是迭代器设计模式
return iterable ;
}
// ----------------------ReduceContextImpl::getValues() end--------------------------
总结:
这里出现了两个迭代器,分别代表不同的指向。
-
input代表的rIter可以对Reducer拉取的整个数据进行迭代遍历,可以获取到下一条数据。
-
调用nextKeyValue()时,在判断有无下一条数据的时候,可以通过比较得出下一条记录是否同属于当前数据组(nextKeyIsSame:key相同的数据为一组)。
-
为真的时候会调用reduce方法,传入的参数是 key 和 values(复数形式),这个values其实就是ValueIterable 的属性ValueIterator.。
-
在Reduce方法中可以拿着这个ValueIterator 调用方法取出数据,并且调用 hasNext()可以判断出下一条数据nextKeyIsSame并更新这个值
我自己看懂就行:
ReduceContextImpl :
- input = rIter 真迭代器
- hasMore = true ;
- nextKeyIsSame = false
- iterable = ValueIterable
- iterator = ValueIterator
ValueIterable :
- iterator () return iterator 假迭代器
ValueIterator:
- hasNext() return firstValue || nextKeyIsSame
- next() nextKeyValue();
nextKey() :
- nextKeyValue()
nextKeyValue()
- 通过input获取数据,对key和value赋值
- 返回布尔值
- 多取一条数据判断更新 nextKeyIsSame ,窥探下一条是不是同一组的。
getCurrentKey():return key;
getValues(): return iterable ;
ReduceTask 拉取回来的数据被包装成一个迭代器,reduce方法被调用的时候,并没有把一组数据真的加载到内存,而是传递一个迭代器-values.在reduce方法中使用这个迭代器的时候:hasNext方法判断nextKeyIsSame . next方法 :调用nextKeyValue方法,从reduceTask级别的迭代器中读取数据,并且更新nextKeyIsSame 。
双迭代器设计的艺术:充分利用了迭代器设计模式,规避了内存OOM问题。MapTask输出的数据分区内有序,所以两个迭代器协作可以在一次I/O中线性的处理完一组数据。
以上是关于MapReduce ——ReduceTask阶段源码分析的主要内容,如果未能解决你的问题,请参考以下文章