Hadoop-MapReduce

Posted 丶落幕

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop-MapReduce相关的知识,希望对你有一定的参考价值。

Hadoop-MapReduce

1 MapReduce 概述

1.1 MapReduce 定义

MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析 应用”的核心框架。

MapReduce 核心功能是将用户编写的业务逻辑代码自带默认组件整合成一个完整的 分布式运算程序,并发运行在一个 Hadoop 集群上。

1.2 MapReduce 优缺点

1.2.1 优点

1)MapReduce 易于编程

它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量 廉价的 PC 机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一 样的。就是因为这个特点使得 MapReduce 编程变得非常流行。

2)良好的扩展性

当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

3)高容错性

MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高 的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行, 不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成的。

4)适合 PB 级以上海量数据的离线处理

可以实现上千台服务器集群并发工作,提供数据处理能力。

1.2.2 缺点

1)不擅长实时计算

MapReduce 无法像 mysql 一样,在毫秒或者秒级内返回结果。

2)不擅长流式计算

流式计算的输入数据是动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。 这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。

3)不擅长 DAG(有向无环图)计算

多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下, MapReduce 并不是不能做,而是使用后,每个 MapReduce 作业的输出结果都会写入到磁盘, 会造成大量的磁盘 IO,导致性能非常的低下

1.3 MapReduce 核心思想


(1)分布式的运算程序往往需要分成至少 2 个阶段。

(2)第一个阶段的 MapTask 并发实例,完全并行运行,互不相干。

(3)第二个阶段的 ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段
的所有 MapTask 并发实例的输出。

(4)MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业
务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行。

总结:分析 WordCount 数据流走向深入理解 MapReduce 核心思想。

1.4 MapReduce 进程

一个完整的 MapReduce 程序在分布式运行时有三类实例进程:

(1)MrAppMaster:负责整个程序的过程调度及状态协调。

(2)MapTask:负责 Map 阶段的整个数据处理流程。

(3)ReduceTask:负责 Reduce 阶段的整个数据处理流程。

1.5 官方 WordCount 源码

采用反编译工具反编译源码,发现 WordCount 案例有 Map 类、Reduce 类和驱动类。且 数据的类型是 Hadoop 自身封装的序列化类型。

1.6 常用数据序列化类型

Java类型Hadoop Writable 类型
BooleanBooleanWritable
ByteByteWritable
IntIntWritable
FloatFloatWritable
LongLongWritable
DoubleDoubleWritable
StringText
MapMapWritable
ArrayArrayWritable
NullNullWritable

1.7 MapReduce 编程规范

用户编写的程序分成三个部分:Mapper、Reducer 和 Driver。

1.Mapper阶段

(1)用户自定义的Mapper要继承自己的父类

(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)

(3)Mapper中的业务逻辑写在map()方法中

(4)Mapper的输出数据是KV对的形式(KV的类型可自定义)

(5)map()方法(MapTask进程)对每一个<K,V>调用一次

2.Reducer阶段

(1)用户自定义的Reducer要继承自己的父类

(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV

(3)Reducer的业务逻辑写在reduce()方法中

(4)ReduceTask进程对每一组相同k的组调用一次reduce()方法

3.Driver阶段

相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是 封装了MapReduce程序相关运行参数的job对象

1.8.1 本地测试

1)需求

在给定的文本文件中统计输出每一个单词出现的总次数


2)需求分析

按照 MapReduce 编程规范,分别编写 Mapper,Reducer,Driver。

3)环境准备

(1)创建 maven 工程,MapReduceDemo

(2)在 pom.xml 文件中添加如下依赖

<dependencies>
 	<dependency>
 		<groupId>org.apache.hadoop</groupId>
 		<artifactId>hadoop-client</artifactId>
 		<version>3.1.3</version>
 	</dependency>
 	<dependency>
 		<groupId>junit</groupId>
 		<artifactId>junit</artifactId>
 		<version>4.12</version>
 	</dependency>
 	<dependency>
	 	<groupId>org.slf4j</groupId>
 		<artifactId>slf4j-log4j12</artifactId>
 		<version>1.7.30</version>
 	</dependency>
</dependencies>

(3)在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在 文件中填入。

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

1.8.2 编写测试类

Mapper

public class WorldCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> 
    private Text text = new Text();
    private IntWritable intWritable = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        //1. 获取一行
        String line = value.toString();
        //2. 切割
        String[] words = line.split(" ");
        //3. 循环写出
        for (String word : words) 
            text.set(word);
            //写出
            context.write(text, intWritable);
        
    

Reducer

public class WorldCountReduce extends Reducer<Text, IntWritable,Text,IntWritable> 
    private IntWritable intWritable=new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
        //累加
        int sum=0;
        for (IntWritable value : values) 
            sum+=value.get();
        
        intWritable.set(sum);
        //写出
        context.write(key, intWritable);
    

Driver

public class WorldCountDriver 
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
        //1 获取job
        Configuration configuration=new Configuration();
        Job job = Job.getInstance(configuration);
        //2 设置jar包路径
        job.setJarByClass(WorldCountDriver.class);
        //3 关联mapper和reduce
        job.setMapperClass(WorldCountMapper.class);
        job.setReducerClass(WorldCountReduce.class);
        //4 设置map输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //5 设置最终输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //6 设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\\\hadoop-test\\\\input"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\\\hadoop-test\\\\output"));
        //7 提交job
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    

1.8.3 提交到集群测试

修改路径参数(改为命令行参数)

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

打包插件

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

打好jar包,在linux上运行jar包(最小包,环境linux上面有)

com.cssl.WorldCountDriver(指定main函数)

hadoop jar MapReduce-Test-1.0-SNAPSHOT.jar com.cssl.WorldCountDriver /qingguo /output

2 Hadoop 序列化

2.1 序列化概述

1)什么是序列化

序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁 盘(持久化)和网络传输。

反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

2)为什么要序列化

一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能 由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的” 对象,可以将“活的”对象发送到远程计算机。

3)为什么不用 Java 的序列化

Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带 很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以, Hadoop 自己开发了一套序列化机制(Writable)。

4)Hadoop 序列化特点:

(1)紧凑 :高效使用存储空间。

(2)快速:读写数据的额外开销小。

(3)互操作:支持多语言的交互

2.2 自定义 bean 对象实现序列化接口(Writable)

在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在 Hadoop 框架内部 传递一个 bean 对象,那么该对象就需要实现序列化接口。

(1)必须实现 Writable 接口

(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

(3)重写序列化方法

(4)重写反序列化方法

(5)注意反序列化的顺序和序列化的顺序完全一致

(6)要想把结果显示在文件中,需要重写 toString(),可用"\\t"分开,方便后续用。

(7)如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口,因为MapReduce 框中的 Shuffle 过程要求对 key 必须能排序。

2.3 序列化案例实操

1)需求

统计每一个手机号耗费的总上行流量、总下行流量、总流量

(1)输入数据 phone_data .txt

(2)输入数据格式:

(3)期望输出数据格式

2)需求分析

3)编写 MapReduce 程序

编写流量统计的 Bean 对象

public class FlowBean implements Writable 
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    ...省略get and set

    public FlowBean() 
    

    @Override
    public void write(DataOutput dataOutput) throws IOException 
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    

    @Override
    public void readFields(DataInput dataInput) throws IOException 
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();
    

    @Override
    public String toString() 
        return upFlow +"\\t" + downFlow +"\\t" + sumFlow;
    

编写 Mapper 类

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> 
    private  Text outK=new Text();
    private  FlowBean outV=new FlowBean();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        //1. 获取一行
        String line = value.toString();
        //2. 切割
        String[] split = line.split("\\t");
        //3. 抓取想要的数据    手机号 上行流量 下行流量
        String phone=split[1];
        String up=split[split.length-3];
        String down=split[split.length-2];
        //4. 封装数据
        outK.set(phone);
        outV.setUpFlow(Long.parseLong(up));
        outV.setDownFlow(Long.parseLong(down));
        outV.setSumFlow();
        //5. 写出
        context.write(outK, outV);
    

编写Reducer类

public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean> 

    private FlowBean outV=new FlowBean();
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException 
        //1. 变量集合累加值
        long totalUp=0;
        long totalDown=0;
        for (FlowBean value : values) 
            totalUp+=value.getUpFlow();
            totalDown+=value.getDownFlow();
        
        //2. 封装outK,outV
        outV.setUpFlow(totalUp);
        outV.setDownFlow(totalDown);
        outV.setSumFlow();

        //3. 写出
        context.write(key, outV);
    

编写Driver类

public class FlowDriver 
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
        //获取job
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        //设置jar
        job.setJarByClass(FlowDriver.class);
        //绑定mapper和reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        //设置mapper输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        //设置最终输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        //设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\\\hadoop-test\\\\input"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\\\hadoop-test\\\\output"));
        //提交job
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    

3 MapReduce 框架原理

3.1 InputFormat 数据输入

3.1.1 切片与 MapTask 并行度决定机制

1)问题引出

MapTask 的并行度决定 Map 阶段的任务处理并发度,进而影响到整个 Job 的处理速度。

思考:1G 的数据,启动 8 个 MapTask,可以提高集群的并发处理能力。那么 1K 的数 据,也启动 8 个 MapTask,会提高集群性能吗?MapTask 并行任务是否越多越好呢?哪些因 素影响了 MapTask 并行度?

2)MapTask 并行度决定机制

数据块:Block 是 HDFS 物理上把数据分成一块一块。数据块是 HDFS 存储数据单位。

数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行 存储。数据切片是 MapReduce 程序计算输入数据的单位,一个切片会对应启动一个 MapTask。

3.1.2 Job 提交流程源码和切片源码详解

1)Job 提交流程源码详解

waitForCompletion()
submit();
	// 1 建立连接
	connect();
		// 1)创建提交 Job 的代理
		new Cluster(getConfiguration());
			// (1)判断是本地运行环境还是 yarn 集群运行环境
			initialize(jobTrackAddr, conf);
	// 2 提交 job
	submitter.submitJobInternal(Job.this, cluster)
		// 1)创建给集群提交数据的 Stag 路径
		Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
		// 2)获取 jobid ,并创建 Job 路径
		JobID jobId = submitClient.getNewJobID();
		// 3)拷贝 jar 包到集群
		copyAndConfigureFiles(job, submitJobDir);
		rUploader.uploadFiles(job, jobSubmitDir);
		// 4)计算切片,

以上是关于Hadoop-MapReduce的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop-MapReduce

hadoop-mapreduce--统计单词数量

初识Hadoop-MapReduce

hadoop-MapReduce概述

Hadoop-MapReduce应用-统计单词个数

16-hadoop-mapreduce简介