hadoop-MapReduce框架原理之Shuffle机制

Posted wnwn

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop-MapReduce框架原理之Shuffle机制相关的知识,希望对你有一定的参考价值。

1.Shuffle机制

  1.1 什么是shuffle机制

    1.1.1 在hadoop中数据从map阶段传递给reduce阶段的过程就叫shuffle,shuffle机制是整个MapReduce框架中最核心的部分;

    1.1.2 shuffle翻译成中文的意思为:洗牌,发牌(核心机制:数据分区,排序,缓存)

  1.2 shuffle的作用范围

    一般把数据从map阶段输出到reduce阶段的过程叫shuffle,所以shuffle的作用范围是map阶段数据输出到reduce阶段数据输入这一整个中间过程;

  1.3 shuffle图解

    技术图片

 

    shuffle并不是hadoop的一个组件,只是map阶段产生数据输出到reduce阶段取得数据作为输入之前的一个过程;

  1.4 shuffle的执行阶段流程

    1.4.1 collect阶段:将mapTask的结果输出到默认大小为100的环形缓冲区,保存的是key/value序列化数据,partition分区信息等;

    1.4.2 spill阶段:当内存中的数据量达到一定阈值的时候,就会将数据写入本地磁盘中,在将数据写磁盘之前需要对数据进行一次排序的操作,如果配置了combiner,还会将有相同分区号和key的数据进行排序;

    1.4.3 merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个mapTask最终值产生一个中间数据文件;

    1.4.4 copy阶段:reducetask启动fetcher线程到已经完成mapTask的节点上复制一份属于自己的数据,这些数据默认会保存在内存缓冲区中,当内存的缓冲区达到一定阈值的时候,就会将数据写入磁盘中;

    1.4.5 merge阶段:在reducetask远程复制数据的同时,会在后台开启两个线程(一个是内存到磁盘合并,一个是磁盘到磁盘合并)对内存到本地的数据文件进行合并操作;

    1.4.6 sort阶段:在对数据进行合并的同时,会进行排序操作,由于mapTask阶段已经对数据进行了局部排序,reducetask只需保证copy的数据的最终整体有效性即可;

  1.5 注意

    shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘IO的次数越少,执行速度就越快,正是因为shuffle的过程中要不断的将文件从磁盘写入到内存,再从内存写入到磁盘,从而导致了hadoop中MapReduce执行效率相对于storm等一些实时计算来说比较低下的原因;

2.Partition分区

  2.1 默认Partitioner分区

@Public
@Stable
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
    public HashPartitioner() {
    }

    public void configure(JobConf job) {
    }

    public int getPartition(K2 key, V2 value, int numReduceTasks) {
        // key的hashcode值与Integer的最大值 在和reduceTask数量求余
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

    默认分区是根据key的hashCode值对ReduceTasks的个数取模得到的;用户没法控制哪个key存储到哪个分区中;

  2.2 分区总结

    (1)如果reduceTask的数量>getPartition的结果数,则会产生几个空的输出文件part-r-000xx;

    (2)如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会发生异常;

    (3)如果ReduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就会产生一个结果文件part-r-00000;

    (4)分区号必须从零开始,逐一累加;

3.Partition分区案例实操

  MyPartitioner类编写

package com.wn.partition;

import com.wn.flow.FlowwBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartitioner extends Partitioner<Text, FlowwBean> {
    @Override
    public int getPartition(Text text, FlowwBean flowwBean, int i) {
        String phone = text.toString();
        switch (phone.substring(0,3)){
            case "136":
                return 0;
            case "137":
                return 1;
            case "138":
                return 2;
            case "139":
                return 3;
            default:
                return 4;

        }
    }
}

  PartitionDriver类编写

package com.wn.partition;

import com.wn.flow.FlowMapper;
import com.wn.flow.FlowReducer;
import com.wn.flow.FlowwBean;
import com.wn.wordcount.WcDriver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class PartitionDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //获取一个Job实例
        Job job = Job.getInstance(new Configuration());

        //设置类路径
        job.setJarByClass(PartitionDriver.class);

        //设置mapper和reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        //设置分区
        job.setNumReduceTasks(5);
        job.setPartitionerClass(MyPartitioner.class);

        //设置mapper和reducer输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowwBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowwBean.class);

        //设置输入的数据
        FileInputFormat.setInputPaths(job,new Path("E:\\input"));
        FileOutputFormat.setOutputPath(job,new Path("E:\\output"));

        //提交job
       boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }

}

4.WritableComparable排序

  排序是MapReduce框架中最重要的操作之一;

  mapTask和reducetask均会对数据按照key进行排序;该操作属于hadoop的默认行为;任何应用程序中的数据均会被排序,而不管逻辑上是否需要;

  默认排序是按照字典顺序排序,且实现该排序的方法是快速排序;

 

5.WritableComparable排序案例实操()

6.WritableComparable排序案例实操()

7.Combiner合并

8.Combiner合并案例实操

9.GroupingComparator分组(辅助。。。)

10.GroupingComparator分组案例实操

以上是关于hadoop-MapReduce框架原理之Shuffle机制的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop-MapReduce

Hadoop- Wordcount程序原理及代码实现

Hadoop-MapReduce

Hadoop-MapReduce

linux下shuf命令随即排序shuf -n1 a.txt

Hadoop-MapReduce