MapReduce基础
Posted 不知落叶何时落
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce基础相关的知识,希望对你有一定的参考价值。
MapReduce入门
理解MapReduce的思想
- MapReduce的思想核心是
先分再合,分而治之
- 所谓的分而治之,就是把一个复杂的问题,按照一定的分解方法分为等价的规模的若干部分,然后捉个解决,分别找出各个部分的结果没然后将各个部分的结果组成问题的最终结果
- 这种思想来源于生活与工作时的经验
- Map负责拆分:即把复杂的任务
分解为若干个“简单的子任务”来进行并行处理
,可以进行拆分的前提是这些小人物可以并行运算,彼此之间几乎没有依赖
- Reduce负责合并:即对map阶段的结果进行全局汇总
- 这两个阶段合起来正式MapReduce思想的体现
如何模拟实现分布式计算
分布式计算的概念
- 分布式计算是一种计算方法,和集中式计算是相对的
- 随着计算技术的发展,有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要消耗的时间非常长才能够完成
- 分布式计算将应用分解成许多小的部分,分配给多台计算机进行处理,这样子可以节约整体计算时间,大大提高计算效率
Hadoop MapReduce设计思想
1. 如何对付大数据处理场景
- 对相互不具有计算以来的大数据任务计算,实现并行最自然的办法就是采取MapReduce分而治之的策略
- 首先Map阶段会进行拆分,把大数据拆分成若分成若干的小数据,多程序同时并行计算产生中间结果,然后是Reduce聚合阶段,通过程序对并行结果进行最终的汇总计算,得出最终的结果
不可拆分的计算任务或相互依赖的数据无法进行并行运算
2. 构建抽象编程模型
- MapReduce借鉴了函数式语言中的思想,用Map和Reduce 两个函数提供了高层并行百年城抽象模型
- Map: 对异族诗句进行某种重复式的处理
- Reduce: 对Map产生的中间结果进行某中进一步的结果整理
- MapReduce中定义了如下的Map和Reduce两个抽象的接口,用户需要去编程实现:
Map:(k1;v1)->(k2;v2)
Reduce:(k1;[v2])->(k3;v3) - 通过以上两个编程接口,可以看出MapReduce处理的数据类型是KV键值对
3. 统一架构、隐藏底层细节
- 如何提供统一的计算框架,如果没有统一封装底层细节,那么程序员需要考虑诸如数据存储、划分、分发、结果收集、错误恢复等诸多细节;为此MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的细节处理
MapReduce最大的亮点在于通过抽象模型和计算框架把需要做什么与具体怎么做分开了,为程序员提供一个抽象和高层的编程接口框架
程序员需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的业务程序代码
- 至于如何具体完成这个并行计算任务所相关的
诸多系统层细节全部都被隐藏起来
,交给计算框架去处理;从分布式代码的执行到大到数千小到单个节点集群的自动调度使用
MapReduce简介
MapReduce介绍
-
Hadoop MapReduce是一个分布式计算框架,用于轻松编写分布式应用程序,这些应用程序以可靠、容错的方式并行处理大型硬件集群(数千个节点)上的大量数据(多TB数据集)
-
MapReduce是一种面向数据处理的一种指导思想,也是用于对大规模数据进行分布式计算的编程模型
-
它的出现解决了人们在面零海量数据束手无策的问题,同时他还是易于使用和高拓展的,使得开发者无需关心分布式系统底层的复杂性即可很容易地编写分布式数据处理程序,并在呈现上万台普通服务器中运行
MapReduce的特点
易于编程
MapReduce框架提供了用于二次开发的接口:简单的实现一些接口就可以完成一个分布式程序。任务计算交给计算框架去处理,将分布式程序部署到Hadoop集群上运行和,集群节点可以拓展到成千上百个
良好的拓展性
当计算机资源不能得到满足的时候,可以通过增加记起来拓展他的计算能力,基于MapReduce的分布式计算的特点可以随机额点数目的增加保持近似于线性的增长,这个特点是MapReduce处理海量数据的关键,通过将计算节点增加至几百或者几千可以很容易处理数百TB甚至PB级别的离线数据
高容错性
Hadoop集群是分布式搭建和部署的,任何单一机器节点宕机了,它可以把上面的计算机任务转移到令一节点上运行,不影响整个作业的完成,过程完全是由Hadoop内部完成的
适合海量数据的离线处理
可以处理GB、TB和PB级别的数据量
MapReduce的局限性
MapReduce有很多优势,也有相对的局限性,局限性不代表不能做,而是在某些场合下实现的效果比较差,并不适合MapReduce来进行处理。
实时计算性能差
MapReduce主要应用于离线作业,无法做到秒级或者亚秒级的数据相应
不能进行流式计算
流式计算特点是数据时源源不断地计算的,并且数据是同台的。而MapReduce是一个离线计算框架,主要是针对静态数据集得到的,数据是不能动态变化的。
MapReduce编程
MapReduce框架体系
-
一个完整的mapReduce程序在分布式运行时有三种实时进程
MRAppMaster
:负责整个程旭过程调度及状态协调MapTask
:负责Map阶段的整个数据处理流程ReduceTask
:负责Reduce阶段的整个数据处理流程
-
MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务非常复杂,那就只能多个MapReduce程序穿行运行
MapReduce编程规范
- 用户编写的程序代码分为三个部分:Mapper、Reducer、Driver(客户端提交作业驱动程序)
- 用户自定义的Mapper和Reducer都要继承各自的父类
Mapper的业务代码罗杰卸载map()方法中
Reducer的业务代码逻辑卸载reduce()方法中
整个程序需要一个Driver来进行提交,提交的是一个表述了各种必要信息的job对象 - 注意:整个MapReduce程序中,数据都是以KV键值对的形式流转的
- 在实际编程解决各种业务问题中,需要考虑每个阶段的输入输出的KV分别是什么
- MapReduce内置了很多默认属性,比如说排序、分组,都和数据的K相关,所以说数据类型确定极其重要
MapReduce内置执行流程:基本流程
虽然MapReduce从外表上看来就两个阶段Map和Reduce,但是内部却包含了许多默认组件和默认行为。包括:
组件:读取数据左键InputFormat、输出数据组件OutputFormat
行为:排序(key的字典需排序)、分组(Reduce阶段key相同的分为一组、一组调用一次reduce处理)
Writable序列化机制
概念
序列化:序列化是将结构化对象转换成字节流便于进行网络传输或者写入持久化存储的过程
反序列化: 反序列化是将字节流转范围一些列结构化对象的过程,重新创建该对象
Java序列化机制
- Java中一切都是对象,开发过程中,经常会涉及到下属场景:
跨进程、跨网络传输对象
将对象数据持久化存储 - 这就需要一种可以在两端传输数据协议
- Java序列化机制就是为了解决这个问题而产生的、
Java对象序列化的机制,把对象表示成一个二进制字节数组
,里面包含了对象的数据、对象的数据类型、对象内部的数据的类型信息,通过保存或者转移这些二进制数据达到持久化、传输的目的- 要实现序列化,需要实现
java.io.Serializable
接口,反序列化时和序列化相反的过程,就是将二进制数组转化为对象的过程
Hadoop序列化机制
- Hadoop序列化没有采用Java序列化机制,而是自己实现了自己的序列化机制: Writable。
- 原因在于Java序列化机制比较臃肿,是重量级的,是一种不断的创建对象的机制,并i企鹅会附带很多信息(校验、继承关系系统等等)
- 但是Hadoop的序列化机制中,用户可以服用对象,可以减少Java对象的分配和回收,提高了应用效率
- Hadoop通过Writable接口实现序列化机制,接口提供两个方法:write和readFields。
write是序列化方法,用于把对象指定的字段写出去
readFields是反序列化方法,用于从字节流中读取字段重构对象 - Hadoop没有提供对象比较功能,所以和Java中的Comparable接口合并,提供一个WritableComparable。
- WritableComparable接口可用于用户自定义对象的比较规则
Hadoop封装的数据类型
Hadoop数据类型 | Java数据类型 |
---|---|
BooleanWritable | boolean |
ByteWritable | byte |
IntWritable | int |
FloatWritable | float |
LongWritable | long |
DoubleWritable | double |
Text | String |
MapWritable | map |
ArrayWritable | array |
NullWritable | null |
Hadoop自定义数据类型
- 如果觉得Hadoop内置的数据类型不够用,比如像自定义JavaBean封装数据传递,则自定义对象必须实现Hadoop的序列化机制:Writable
- 如果需要将自定义的对象作为Key传递,则还需要实现Comparable接口,因为MapReduce框架中的Shuffle过程要求对Key必须能够排序,需要指定对象的排序规则是什么
WordCount案例
在这里将进行一个案例分析: WordCount:字词统计,这里将对其词语个数进行一个分析,其本质比较简单,说到底就是根据分隔符进行一个拆分,随后进行统计,整体流程比较简单,主要目的是通过此次案例分析,更好的认识、理解MapReduce。
依赖准备:
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
</dependencies>
Mapper代码:
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author wxk
*/
public class WordMapper extends Mapper<LongWritable, Text,Text,LongWritable>
private Text keyOut =new Text();
private final LongWritable out=new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
String [] worlds = value.toString().split("\\\\s+");
for (String word : worlds)
keyOut.set(word);
//写入到上下文中
context.write(keyOut,out);
在这里我将所有的包也写入了进去,主要目的是防止导包倒错。比如说Text是hadoop.io.Text
而不是其他的Text
再导入包之后对其Mapper进行一个解释:
在这里可以看到这里存在4个值 KEYIN
VALUEIN
KEYOUT
VALUEOUT
分别对应进入的键、进入的值,输出出去的键、输出出去的值。
根据我们的业务,输入的键是偏移量,值是字符串,而输出的时候键是字符串,值是数字。兼顾大文本量,这里采用Long
所以到最后我们的继承就是Mapper<LongWritable, Text,Text,LongWritable>
那么后续的Reduce理解起来就相对比较容易了
Reduce代码
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author wxk
*/
public class WordReduce extends Reducer<Text, LongWritable,Text,LongWritable>
private LongWritable result=new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException
long count = 0;
for (LongWritable value : values)
count+=value.get();
result.set(count);
//输出
context.write(key,result);
Driver代码
V1.0版本
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @author wxk
*/
public class WordDriver
public static void main(String[] args) throws Exception
//配置文件对象
Configuration conf = new Configuration();
// 创建作业实例
Job job = Job.getInstance(conf,WordDriver.class.getSimpleName());
//设置作业的驱动
job.setJarByClass(WordDriver.class);
//设置Mapper
job.setMapperClass(WordMapper.class);
//设置Reduce
job.setReducerClass(WordReduce.class);
//设置Mapper阶段输出的Key
job.setMapOutputKeyClass(Text.class);
//设置Mapper阶段输出的Value
job.setMapOutputValueClass(LongWritable.class);
//设置Reduce阶段输出的Key的类型
job.setOutputKeyClass(Text.class);
//设置Reduce阶段输出的Value的类型
job.setOutputValueClass(LongWritable.class);
//设置文件的路径 《上级文件夹》
FileInputFormat.addInputPath(job,new Path(args[0]));
//设置输出结果的文件夹
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交任务
boolean b = job.waitForCompletion(true);
//判断任务是否执行成功
if (b)
System.out.println("OK");
else
System.out.println("error");
V2.0版本:MapReduce推荐使用该版本进行操作
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author wxk
* @date 2022/11/21/8:53
*/
public class WordCount2 extends Configured implements Tool
public static void main(String[] args) throws Exception
Configuration configuration =new Configuration();
int run = ToolRunner.run(configuration, new WordCount2(), args);
System.out.println("status="+run);
@Override
public int run(String[] args) throws Exception
// 创建作业实例
Job job = Job.getInstance(getConf(),WordCount2.class.getSimpleName());
//设置作业的驱动
job.setJarByClass(WordDriver.class);
//设置Mapper
job.setMapperClass(WordMapper.class);
//设置Reduce
job.setReducerClass(WordReduce.class);
//设置Mapper阶段输出的Key
job.setMapOutputKeyClass(Text.class);
//设置Mapper阶段输出的Value
job.setMapOutputValueClass(LongWritable.class);
//设置Reduce阶段输出的Key的类型
job.setOutputKeyClass(Text.class);
//设置Reduce阶段输出的Value的类型
job.setOutputValueClass(LongWritable.class);
//设置输入文件的文件夹路径 切记:一定是文件夹路径而不是文件的具体路径
FileInputFormat.addInputPath(job,new Path(args[0]));
//设置输出结果的文件夹
FileOutputFormat.setOutputPath(job,new Path(args[1]));
return job.waitForCompletion(true)?0:1;
打包、发送到服务器上
在pom.xml中添加以下代码:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>MapReduceTest.WordDriver</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
在这里指定入口:随后点击Maven的Clean 然后点击package即可,随后将target下的文件进行复制到服务器上
运行
在以往的搭建中,运行Jar包是通过Jar这个命令,但是MapReduce这个文件的Jar包他好需要依赖于Hadoop,比如说HDFS等等,这里Hadoop就提供了相应的方法,可以直接完成:
hadoop jar ***.jar args1 args2
这里的args1 、args2是因为方法中要使用args参数,所以需要传参,如果在文件中已经写死了,那么就没有必要在写了
、
运行jar包,运行结果:

去查看文件输出
可见分词成功
运行流程
Map阶段执行流程
- 第一阶段: 把输入目录下的文件按照一定的标准逐个
进行逻辑切片
,形成切片规划
默认Split size=Block Size,每个切片由一个MapTask处理 - 第二阶段:对切片中的数据按照一定的规则读取解析,返回<k,v>键值对
默认是按照行读取数据,Key是每一行的起始位置偏移量,Value是本行的文本内容
- 第三阶段:调用Mapper类中的Map方法处理数据
每读取解析出来一个<k,v>,就调用一次Map方法
- 第四阶段:按照一定的规则对Map输出的键值对进行分区partition,默认不分区,因为只有一个reducetask。
分区的数量就是reduce task的运行数量 - 第五阶段:Map输出数据写入内存缓冲区,达到比例溢出到磁盘上,
溢出spil
的时候根据key进行排序sort
默认是根据key字典许排序 - 第六阶段:对所有一处文件进行最终的
merge合并
,成为一个文件。
Reduce阶段执行流程
执行流程图
以上是关于MapReduce基础的主要内容,如果未能解决你的问题,请参考以下文章