MapReduce初识

Posted shi_zi_183

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce初识相关的知识,希望对你有一定的参考价值。

MapReduce

概述

MapReduce是一种分布式并行计算框架

MapReduce和传统的并行编程模型框架的区别

传统并行计算框架MapReduce
集群架构/容错性共享式(共享内存/共享存储),容错性差,拓展性较差
硬件/价格/扩展性刀片服务器、高速网、存储区域网络SAN,价格贵,扩展性差
编程/学习难度what-how,难,编程原理和多线程的编程逻辑比较类似,需要借助很多互斥量信息锁等机制,要实现不同任务之间的同步
适用场景实时、细粒度计算、计算密集型

MapReduce的策略

Map函数

比如一个n行待处理的文本数据,先将它分成一行行小数据集

函数输入输出说明
Map<k,v><行号,"a b c d">List(<k,v>)<"a",1> <"b",1>...将小数据集进一步解析成一批<key,value>对,输入Map函数中进行处理,每个输入的<k1,v1>会输出一批<k2,v2><k2,v2>是计算的中间结果。


InputFomat接口的Java API解释:逻辑上分割作业的输入文件集。每个切分后的inputsplit对应一个mapper,简单的说, inputsplit数量决定了mapper的个数。
在InputFormat接口中只有两个方法
1.getSplits(JobConf job, int numSplits) throws IOException;
返回InputSplit[]
2.getRecordReader(InputSplit split,JobConf job,Reporter reporter) throws IOException;
返回一个任意的recordReader<K,V>, recordReader就是处理键值对的,由recordreader作为流,读取文件,然后决定谁是key谁是value
我们经常使用的FileInputFormat抽象类,实现了InputFormat接口

Reduce函数

输入
<key,value-list>
一个键以及一堆值的列表,value-list是值的列表,如:key值为a,值列为<1,1,1>
输出
对value-list进行一个reduce的结构,对这些值进行汇总就和,并生成一个<key,value>

函数输入输出说明
Reduce<k2,List(v2)><"a",<1,1>> <"b",1> <"c",1><k3,v3><"a",2> <"b",1> <"c",1>输入的中间结果<k2,List(v2)>中的List(v2)表示是一批属于同一个k2的value

MapReduce的体系结构

MapReduce体系结构主要由四个部分组成,分别是:Client、JobTracker、TaskTracker以及Task
1、Client
用户编写的MapReduce程序通过Client提交到JobTracker端。
用户可通过Client提供的一些接口查看作业运行状态。
2、JobTracker(作业跟踪器)

  • JobTracker负责资源监控和作业调度
  • JobTracker 监控所有TaskTracker与Job的健康状况,一旦发现失败,就将相应的任务转移到其他节点
  • JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源

3、TaskTracker(任务跟踪器)
执行具体的相关任务一般接受JobTracker发过来的命令
把一些自己的资源使用情况,以及任务的运行进度通过心跳的方式,也就是heartbeat发送给JobTracker
4、Task
Task分为Map Task和Reduce Tack两种,均由TaskTracker启动

工作流程

概述

  • 不同的Map任务之间不会进行通信
  • 不同的Reduce任务之间也不会发生任何信息交换
  • 用户不能显式地从一台机器向另一台机器发送消息
  • 所有的数据交换都是通过MapReduce框架自身去实现的

map函数接收一个<key,value>形式的输入,然后同样产生一个<key,value>形式的中间输出,reduce函数接收一个如<key,(list of values)>形式的输入,然后对这个value集合进行处理,每个reduce产生0或1个输出,reduce的输出也是<key,value>形式的。
Split(分片)
HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。
Map任务的数量
Hadoop为每个split创建一个Map任务,split 的多少决定了Map任务的数目。大多数情况下,理想的分片大小是一个HDFS块
Reduce任务的数量

  • 最优的Reduce任务个数取决于集群中可用的reduce任务槽(slot)的数目
  • 通常设置比reduce任务槽数目稍微小一些的Reduce任务个数(这样可以预留一些系统资源处理可能发生的错误)
    Shuffle过程


Map端的Shuffle过程

  • 每个Map任务分配一个缓存
  • MapReduce默认100MB缓存
  • 设置溢写比例0.8
  • 分区默认采用哈希函数
  • 排序是默认的操作
  • 排序后可以合并(Combine)
  • 合并不能改变最终结果
  • 在Map任务全部结束之前进行归并
  • 归并得到一个大的文件,放在本地磁盘
  • 文件归并时,如果溢写文件数量大于预定值(默认是3)则可以再次启动Combiner,少于3不需要
  • JobTracker会一直监测Map任务的执行,并通知Reduce任务来领取数据
    Reduce端的Shuffle过程
  • Reduce任务通过RPC向JobTracker询问Map任务是否已经完成,若完成,则领取数据
  • Reduce领取数据先放入缓存,来自不同Map机器,先归并,再合并,写入磁盘
  • 多个溢写文件归并成一个或多个大文件,文件中的键值对是排序的
  • 当数据很少时,不需要溢写到磁盘,直接在缓存中归并,然后输出给Reduce

第一个MapReduce项目

本地IDEA创建hadoop程序

创建Maven项目

修改pom.xml文件

添加项目依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ex</groupId>
    <artifactId>MyMapReduce</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.2.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

选择Import Changes,下载依赖,过程可能需要一些时间

src/main/resources文件夹下创建log4j的日志配置文件

log4j.properties

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

修改windows主机中的hosts文件
C:\\Windows\\System32\\drivers\\etc\\hosts

hadoop节点ip根据自己情况修改
编写代码
src/main/java下新建一个包

编写Mapper类
WordCountMapper.java

package com.ex.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

/**
 * KEYIN, map阶段输入的key的类型:LongWritable
 * VALUEIN,map阶段输入value类型:Text
 * KEYOUT,map阶段输出的Key类型:Text
 * VALUEOUT,map阶段输出的value类型:IntWritable
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private Text outK = new Text();
    private IntWritable outV = new IntWritable(1);  //map阶段不进行聚合

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 1 获取一行
        // xxxxxx xxxxxx
        String line = value.toString();

        // 2 切割(取决于原始数据的中间分隔符)
        // xxxxxxx
        // xxxxxxx
        String[] words = line.split(" ");

        // 3 循环写出
        for (String word : words) {
            // 封装outk
            outK.set(word);

            // 写出
            context.write(outK, outV);
        }
    }
}

编写Reduce类
WordCountReduce.java

package com.ex.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * KEYIN, reduce阶段输入的key的类型:Text
 * VALUEIN,reduce阶段输入value类型:IntWritable
 * KEYOUT,reduce阶段输出的Key类型:Text
 * VALUEOUT,reduce阶段输出的value类型:IntWritable
 */
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    private IntWritable outV = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int sum = 0;
        // xxxxxxx xxxxxxx ->(xxxxxxx,1),(xxxxxxx,1)
        // xxxxxxx, (1,1)
        // 将values进行累加
        for (IntWritable value : values) {
            sum += value.get();
        }

        outV.set(sum);

        // 写出
        context.write(key,outV);
    }
}

编写Driver驱动类

package com.ex.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 1 获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 设置jar包路径
        job.setJarByClass(WordCountDriver.class);

        // 3 关联mapper和reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 4 设置map输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5 设置最终输出的kV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6 设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\\\Data\\\\Hadoop\\\\inputfile"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\\\Data\\\\Hadoop\\\\out"));

        // 7 提交job
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);
    }
}

本地测试
(1)需要首先配置好 HADOOP_HOME 变量以及 Windows 运行依赖
(2)在 IDEA 上运行程序
(3)在MapReduce中,如果输出路径存在会报错
运行之后,生成out目录,part-r-00000为结果文件


从IDEA中导出可执行jar包

为了方便更改输入输出路径,修改WordCountDriver.java

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

1、打开IDEA开发工具,选中我们的项目,然后依次选中
File->Project Structure…

2、创建artifact

3、编译jar程序




编译结束后,jar包可以在IDEA项目路径/out/artifacts/项目名_jar中找到

将jar包放入集群中运行。

将inputfile放入HDFS中

运行jar

hadoop jar MyMapReduce.jar com.ex.mapreduce.wordcount.WordCountDriver input output



至此完成MapReduce初步学习

以上是关于MapReduce初识的主要内容,如果未能解决你的问题,请参考以下文章

初识Spring源码 -- doResolveDependency | findAutowireCandidates | @Order@Priority调用排序 | @Autowired注入(代码片段

初识Hadoop-MapReduce

MapReduce初识

初识MapReduce

初识MapReduce

Hadoop学习之路(十三)MapReduce的初识