3. Hadoop MapReduce

Posted 小K学大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了3. Hadoop MapReduce相关的知识,希望对你有一定的参考价值。

MapReduce

概述

MapReduce是一个分布式的运算程序编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,然后并发运行在Hadoop集群上

优点

  1. 易于编程
    • 可以简单的通过增加机器扩展计算能力
  2. 高容错性
    • 如果其中一个节点失去连接,可以将上面的任务迁移到另外的一个节点上运行,不需要人工干预
  3. 适合PB级以上的海量数据的离线处理

缺点

  1. 不擅长实时计算
    • 无法向mysql一样,以毫秒级返回结果
  2. 不擅长流式计算
    • MapReduce的输入数据是静态的,而流式计算的数据输入是动态的
  3. 不擅长DAG(有向无环图)计算
    • MapReduce的数据结果都会写入磁盘
    • 如果多个应用程序串联,前一个的输出是下一个的输入,会导致大量的磁盘IO,性能低下

核心思想

  1. Map阶段 ---> MapTask并发实例,并行运行。
  2. Reduce阶段 ---> ReduceTask并发运行,ReduceTask的输入是MapTask的输出
  3. MapReduce只能包含一个Map阶段和一个Reduce阶段。如果需要只能编写多个MapReduce程序,串行运行

MapReduce进程

  1. MrAppMaster
    • 负责整个程序的过程调度及状态协调
  2. MapTask
    • 负责Map阶段的整个数据处理流程
  3. ReduceTask
    • 负责Reduce阶段的整个数据处理流程

编程规范

Hadoop序列化基本类型

  1. BooleanWritable
  2. ByteWritable
  3. IntWritable
  4. FloatWritable
  5. LongWritable
  6. DoubleWritable
  7. Text ---> String
  8. MapWritable
  9. ArrayWritable
  10. NullWritable

WordCount案例

  1. Mapper
public class WCMapper extends Mapper<LongWritableTextTextIntWritable>{
  Text outK = new Text();
  IntWritable outV = new IntWritable(1);
  
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
    String s = value.toString();
    String[] strs = s.split("\t");
    for (String str : strs){
      outK.set(str);
      context.write(outK, outV);
    }
  }
}
  1. Reducer
public class WCReducer extends Reducer<TextIntWritableTextIntWritable>{
  int sum;
  IntWritable outV = new IntWritable();
  
  @Override
  protexted void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
    sum = 0;
    for (IntWritable value : values){
      sum += value.get();
    }
    outV.set(sum);
    context.write(key, outV);
  }
}
  1. Driver
public class WCDriver{
  public static void main(String[] args){
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    
    job.setJarByClass(WCDriver.class);
    job.setMapperClass(WCMapper.class);
    job.setReducerClass(WCReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.setInputPaths(job, new Path("..."));
    FileOutputFormat.setOutputPath(job, new Path("..."));
    job.waitForCompletion(true);
  }
}

Hadoop序列化

由于MapTask和ReduceTask之间数据传输需要涉及磁盘IO(持久化),因此需要将自定义Bean对象进行持久化,实现Writable接口。

Writable Bean对象

public class PhoneBean implements Writable{
  private long upFlow;
  private long downFlow;
  private long sumFlow;
  
  public PhoneBean(){
  }
  
  //getter
  //setter
  //toString
  
  @Override
  public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeLong(upFlow);
    dataOutput.writeLong(downFlow);
    dataOutput.writeLong(sumFlow);
  }
  
  @Override
  pubic void readFields(DataInput dataInput) throws IOException {
    this.upFlow = dataInput.readLong();
    this.downFlow = dataInput.readLong();
    this.sumFlow = dataInput.readLong();
  }
}

切片与MapTask并行度

数据块 vs. 数据切片

  1. 数据块是HDFS按照块大小把文件物理分成一块一块并存储
  2. 数据切片只是在逻辑上对输入文件进行切片,添加切片标记信息,并不会物理切片,每个数据切片会对应启动一个MapTask

切片

  1. Map阶段的并行度由客户端在提交Job时的数据切片数决定
  2. 每个Split切片会分配一个MapTask与另外的MapTask并行处理
  3. 默认情况下,Yarn环境下切片大小等于块大小(本地环境下切片大小等于32MB)
    • computeSplitSize()
  4. 切片是针对每个文件单独切片,不考虑整体
    • 每次切片都要判断剩余文件大小是否大于块大小的1.1倍
    • 小于块大小的1.1倍时直接将剩余文件划分一个切片
  5. 将切片信息写入一个切片规划文件中
    • getSplit()
    • InputSplit记录切片元数据 ---> 起始位置,长度和所在节点列表
  6. 将切片文件提交至Yarn
    • MrAppMaster根据切片规划文件计算MapTask个数

MapReduce工作流程

图解

  1. Map阶段

  2. Map步骤

    • Read阶段
      • MapTask通过InputFormat获得RecordReader,从输入InputSplit中解析出key/value
    • Map阶段
      • 解析出key/value交给map()处理,并输出新的key/value
    • Collect阶段
      • 完成数据处理完成后,调用OutputCollector.collect()输出结果,将输出的key/value分区写入一个环形内存缓冲区
    • Spill阶段
      • 当环形缓冲区存储达到阈值后,MapReduce会将数据写入本地磁盘上,生成临时文件
      • 写入本地磁盘之前,对数据进行本地排序,必要时进行数据合并、压缩的操作
    • Merge阶段
      • 所有数据处理完成后,每个MapTask对所有有关的临时文件进行合并,保证最终只生成一个数据文件
  3. Reduce阶段

  4. Reduce步骤

    • Copy阶段
      • ReduceTask从各个MapTask上远程拷贝数据,如果数据大小超过阈值,则写入磁盘,否则直接放到内存中
    • Sort阶段
      • ReduceTask会启动两个后台线程对内存和磁盘上的文件进行合并,防止内存使用过多或磁盘上文件过多
      • ReduceTask会对所有取到的数据进行一次归并排序,将key相同的数据聚合在一起,传入reduce()
    • Reduce阶段
      • reduce()将计算结果写入HDFS上

分区 Partition

将MapTask的结果按照不同条件输出到不同的文件中然后作为输入传给ReduceTask

默认分区

public class HashPartitioner<KVextends Partitioner<KV{
  public int getPartition<K key, V value, int numReduceTasks> {
    return (key.hashCode() & Inter.MAX_VALUE) % numReduceTasks;
  }
}

根据默认分区,无法根据key指定存储的分区位置,通过hashCode决定。

自定义分区

  1. 自定义类继承Partitioner,重写getPartition()
  2. Driver类中,设置自定义Partitioner
    • job.setPartitionerClass(MyPartitioner.class);
  3. 将自定义分区的数量设置为ReduceTask的数量
    • job.setNumReduceTask(numOfPartitioner);

注意事项

  1. numReduceTask > numPartitioner
    • 生成几个空的part-r-xxxx文件
  2. numReduceTask < numPartitioner
    • Exception
  3. numReduceTask = 1
    • 所有mapTask的输出不管多少个分区,都会输出至这一个ReduceTask
    • 生成一个part-r-xxxx文件

排序

  1. MapTask和ReduceTask都会对数据按照key进行排序,默认按照字典顺序,以快排方式实现排序
  2. MapTask
    • 对数据按照分区进行快排
    • 写出到磁盘
    • 写出至磁盘后对文件进行归并排序
    • 处理结果放至环形缓冲区,存储MapTask输出KV值和索引数据
    • 环形缓冲区的存储写入达到80%时触发溢写
  3. ReduceTask
    • 远程从MapReduce拷贝对应分区的数据文件
    • 文件大小超过阈值 ---> 写入至磁盘,否则存在内存中
    • 磁盘文件数目超过阈值 ---> 进行一次归并排序
    • 数据拷贝完毕后 ---> 对内存和磁盘上的所有文件统一进行一次归并排序
    • 按照相同的key进行分组传给reduce()

自定义排序

  1. Bean对象实现WritableComparable接口并重写compareTo方法
  2. 因为默认对key进行排序,需要将需要排序的Bean对象作为输入传入Mapper

合并 Combiner

  1. Combiner是MR程序中Mapper和Reducer之外的一种组件
  2. Combiner的父类是Reducer
  3. Combiner和Reducer的区别
    • Combiner是在每一个MapTask所在的节点运行
    • Reducer接收全局所有的MapTask的输出结果
  4. Combiner对每一个MapTask的输出进行局部汇总,减小网络传输
  5. Combiner不能影响业务的最终逻辑
  6. Combiner的输出KV要和Reducer的KV对应

实现

  1. 自定义Combiner类继承Reducer,并重写Reduce()
  2. Driver类中设置
    • job.setCombinerClass(MyCombiner.class);

Join

类似MySQL的join,按照需求实现多表合并并输出结果

Map Join

Reduce Join容易在reduce端处理过多的数据,导致数据倾斜。解决:在Map端通过缓存存入多张表,提前处理业务逻辑,增加Map的业务,减小Reduce端压力

代码

  1. Mapper的setup阶段,将文件读取到缓存中
public class MapJoinMapper extends Mapper<LongWritableTextTextNullWritable{

    private Map<String, String> pdMap = new HashMap<>();
    private Text text = new Text();

    //任务开始前将pd数据缓存进pdMap
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //通过缓存文件得到小表数据pd.txt
        URI[] cacheFiles = context.getCacheFiles();
        Path path = new Path(cacheFiles[0]);

        //获取文件系统对象,并开流
        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataInputStream fis = fs.open(path);

        //通过包装流转换为reader,方便按行读取
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));

        //逐行读取,按行处理
        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            //切割一行    
            String[] split = line.split("\t");
            pdMap.put(split[0], split[1]);
        }

        //关流
        IOUtils.closeStream(reader);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //读取大表数据    
        String[] fields = value.toString().split("\t");

        //通过大表每行数据的pid,去pdMap里面取出pname
        String pname = pdMap.get(fields[1]);

        //将大表每行数据的pid替换为pname
        text.set(fields[0] + "\t" + pname + "\t" + fields[2]);

        //写出
        context.write(text,NullWritable.get());
    }
}
  1. 在Driver类中加载缓存
public class JoinDriver{
  public static void main(String[] args){
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    
    job.setJarByClass(JoinDriver.class);
    job.setMapperClass(JoinMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(NullWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    
    job.addCacheFile(new URL("file:///...."));
    job.setNumReduceTasks(0);
    
    FileInputFormat.setInputPaths(job, new Path("..."));
    FileOutputFormat.setOutputPath(job, new Path("..."));
    job.waitForCompletion(true);
  }
}


以上是关于3. Hadoop MapReduce的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop学习笔记:使用Mrjob框架编写MapReduce

3. Hadoop MapReduce

大数据之Hadoop(MapReduce):MapReduce扩展案例

Hadoop-3.0.2 覆盖源代码生效

大数据之Hadoop(MapReduce):KeyValueTextInputFormat使用案例

Hadoop实战-MapReduce之maxminavg统计