Shuffle流程

Posted lyr999736

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Shuffle流程相关的知识,希望对你有一定的参考价值。

俗称:洗牌

 

1.MapReduce的执行流程(官网图)

 

  InputFormat-->InputSplit-->map函数(mapper)-->环形缓冲区-->partition(分区)-->sort(排序)-->spill to disk(溢写至磁盘)-->merge(合并)-->存储在maptask节点的本地(本地存储)-->fetch(通过http协议拉取map端的输出结果,按照partititon)-->merge(合并)将来自不同节点的map输出数据,合并成一个大的文件,供reduce使用-->reduce函数(reducer)-->output

 

2.shuffle流程(源代码分析)

 

 

1.【MapTask.class】类的分析
run()-->首先,判定是否存在Reduce阶段,通过conf.getNumReduceTasks()的值;如果无Reduce阶段,将mapPhase参数设置成1(100%),无sort阶段;
如果存在Reduce阶段,将MapTask分成两个阶段:一个是mapPhase(67%);另一个为sortPhase(33%).
  -->initialize(job, getJobID(), reporter, useNewApi);
    -->将job的state设置成RUNNING,并且验证output
  -->runNewMapper(job, splitMetaInfo, umbilical, reporter);
    -->【make a mapper(创建Mapper)】ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
      -->conf.getClass(MAP_CLASS_ATTR, Mapper.class);
说明:通过反射机制获取由taskContext.getMapperClass()定义的相关Mapper类;
MAP_CLASS_ATTR = "mapreduce.job.map.class";
参照dirver类:job.setMapperClass(AirMapper.class)--> conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
    -->【make the input format(创建InputFormat)】 略!!!
    -->【获取当前的InputSplit】getSplitDetails()
    -->定义input、output
      -->通过reduce的个数,判定output路径:
        如果reduce=0,output路径由FileOutputFormat.OUTDIR决定;
        如果reduce!=0,output路径由MapOutputCollector决定;
    -->包装context上下文对象
    -->【RecordReaer的初始化方法】input.initialize(split, mapperContext);
    --> mapper.run(mapperContext);
说明:真正加载自定义的Mapper类,调用run(),循环遍历(context.nextKeyValue())
      -->context.write(new Text(year), air);
      -->【WrappedMapper.class】write(KEYOUT key, VALUEOUT value)
      -->【TaskInputOutputContextImpl】write(KEYOUT key, VALUEOUT value)
      -->【MapTask.class$NewOutputCollector<K,V>.class】write(K key, V value)
      -->【MapTask.class$MapOutputBuffer.class】collect(K key, V value, final int partition)
        -->说明:collect收集来自mapper.writer处理的k2,v2,partition
判定keyClass和ValueClass是否符合格式,确保partition数不能为复数,并且不能大于reduce数量,否则抛异常IOException;
检查spill溢写异常处理:checkSpillException()
        -->【缓冲区剩余大小计算为:阈值0.8(80%),80m】bufferRemaining=83886080(80m)
【缓冲区每个元数据默认大小】METASIZE=16=NMETA * 4;NMETA:元数据个数为4个int,每个int占4个字节
【(元数据)kvmeta:VALSTART+KEYSTART+PARTITION+VALLEN(4个int类型)】
      -->
      -->
  -->【MapTask.class$MapOutputBuffer.class】环形缓冲区
    说明:init()关键代码:
      a. mapOutputFile = mapTask.getMapOutputFile()
        -->map本地存储的目录为:
        在mapred-site.xml文件中:mapreduce.cluster.local.dir = ${hadoop.tmp.dir}/mapred/local
      b. partitions = job.getNumReduceTasks()
        --> 默认分区数取决于reduce数
      c. rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw()
        -->说明map阶段将结果写入到localFileSystem
      d. spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8)
        -->在mapred-site.xml文件中:mapreduce.map.sort.spill.percent = 0.8
      e.sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
        -->定义默认缓冲区值为100;mapreduce.task.io.sort.mb=100
f.sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",QuickSort.class, IndexedSorter.class), job);
        -->定义排序算法默认为QuickSort.class
      g. kvbuffer = new byte[maxMemUsage];
        -->定义初始化缓冲区大小
      h. kvmeta =         ByteBuffer.wrap(kvbuffer).order(ByteOrder.nativeOrder()).asIntBuffer();
        -->定义元数据存储大小:IntBuffer
      i.setEquator(0);
        -->设置默认原点;
        说明:flush()关键代码:
a. sortAndSpill()
  排序并溢写:

  

  1). -->Path filename =mapOutputFile.getSpillFileForWrite(numSpills, size);
    说明:创建mapoutputfile:默认路径【 mphadoop-centosmapredlocallocalRunnercentosjobcachejob_local397295198_0001attempt_local397295198_0001_m_000000_0outputspill0.out】
------------------------------Map阶段第一次sort开始---------------------------------------------------
  2). -->sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
    说明:先执行sort,默认采用的是QuickSort。
  3). -->【MapOutputBuffer.class】compare(final int mi, final int mj)
    说明:排序比较:先按partition分区排序,然后再按key排序;按kvmeta(元数据)进行整体sort
  4). -->【MapOutputBuffer.class】swap(final int mi, final int mj)
    说明:转换:在metadata中进行转换
------------------------------Map阶段第一次sort结束---------------------------------------------------
  5). --> writer.append(key, value);
    说明:先按partition分区进行循环遍历,将<key,value>append至writer,最终写入到spillfile中
spill格式:p0[<k,v>:<k,v>:<k,v>...:<k,v>:<k,v>];p1[<k,v>:<k,v>:<k,v>...:<k,v>:<k,v>];p2[<k,v>:<k,v>:<k,v>...:<k,v>:<k,v>]
  6). -->spillThread.interrupt();
    说明:spill线程结束。并关闭;
  7). -->mergeParts();
    说明:开启merge阶段
    判定:当numSpills == 1;将spillfile文件重命名为file.out,并生成相应的file.out.index
当numSpills == 0;创建空文件
当numSpills > 0; 先循环遍历partition分区,然后再遍历溢写文件,将相同分区的不同spillfile的内容添加至ArrayList<Segment>中
调用Merger.merge()方法,进行合并;开启Map阶段的第二次排序
------------------------------Map阶段第二次sort开始---------------------------------------------------
说明:默认采用java集工具类的sort方法:Collections.sort(segments, segmentComparator);
Comparator<Segment<K, V>> segmentComparator =
new Comparator<Segment<K, V>>() {
public int compare(Segment<K, V> o1, Segment<K, V> o2) {
if (o1.getLength() == o2.getLength()) {
return 0;
}

return o1.getLength() < o2.getLength() ? -1 : 1;
}
};
------------------------------Map阶段第二次sort结束---------------------------------------------------
  8). -->write merged output to disk:将merge后的数据写入到disk

 

在reduce端通过http协议抓取到磁盘的数据,在进行merge,与之前map端的merge相同,并且进行排序,到reduce函数,最后输出。

 

 



























































































以上是关于Shuffle流程的主要内容,如果未能解决你的问题,请参考以下文章

Flink Sort-Shuffle读流程简析

FlinkFlink Sort-Shuffle写流程简析

Shuffle流程

spark shuffle流程

整个shuffle的流程图

mapreduce的shuffle机制