MapReduce初识
Posted shi_zi_183
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce初识相关的知识,希望对你有一定的参考价值。
MapReduce
概述
MapReduce是一种分布式并行计算框架
MapReduce和传统的并行编程模型框架的区别
传统并行计算框架 | MapReduce |
---|---|
集群架构/容错性 | 共享式(共享内存/共享存储),容错性差,拓展性较差 |
硬件/价格/扩展性 | 刀片服务器、高速网、存储区域网络SAN,价格贵,扩展性差 |
编程/学习难度 | what-how,难,编程原理和多线程的编程逻辑比较类似,需要借助很多互斥量信息锁等机制,要实现不同任务之间的同步 |
适用场景 | 实时、细粒度计算、计算密集型 |
MapReduce的策略
Map函数
比如一个n行待处理的文本数据,先将它分成一行行小数据集
函数 | 输入 | 输出 | 说明 |
---|---|---|---|
Map | <k,v> 如<行号,"a b c d"> | List(<k,v>) 如<"a",1> <"b",1>... | 将小数据集进一步解析成一批<key,value> 对,输入Map函数中进行处理,每个输入的<k1,v1> 会输出一批<k2,v2> 。<k2,v2> 是计算的中间结果。 |
InputFomat接口的Java API解释:逻辑上分割作业的输入文件集。每个切分后的inputsplit对应一个mapper,简单的说, inputsplit数量决定了mapper的个数。
在InputFormat接口中只有两个方法
1.getSplits(JobConf job, int numSplits) throws IOException;
返回InputSplit[]
2.getRecordReader(InputSplit split,JobConf job,Reporter reporter) throws IOException;
返回一个任意的recordReader<K,V>, recordReader就是处理键值对的,由recordreader作为流,读取文件,然后决定谁是key谁是value
我们经常使用的FileInputFormat抽象类,实现了InputFormat接口
Reduce函数
输入
<key,value-list>
一个键以及一堆值的列表,value-list是值的列表,如:key值为a,值列为<1,1,1>
输出
对value-list进行一个reduce的结构,对这些值进行汇总就和,并生成一个<key,value>
函数 | 输入 | 输出 | 说明 |
---|---|---|---|
Reduce | <k2,List(v2)> 如<"a",<1,1>> <"b",1> <"c",1> | <k3,v3> 如<"a",2> <"b",1> <"c",1> | 输入的中间结果<k2,List(v2)> 中的List(v2)表示是一批属于同一个k2的value |
MapReduce的体系结构
MapReduce体系结构主要由四个部分组成,分别是:Client、JobTracker、TaskTracker以及Task
1、Client
用户编写的MapReduce程序通过Client提交到JobTracker端。
用户可通过Client提供的一些接口查看作业运行状态。
2、JobTracker(作业跟踪器)
- JobTracker负责资源监控和作业调度
- JobTracker 监控所有TaskTracker与Job的健康状况,一旦发现失败,就将相应的任务转移到其他节点
- JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源
3、TaskTracker(任务跟踪器)
执行具体的相关任务一般接受JobTracker发过来的命令
把一些自己的资源使用情况,以及任务的运行进度通过心跳的方式,也就是heartbeat发送给JobTracker
4、Task
Task分为Map Task和Reduce Tack两种,均由TaskTracker启动
工作流程
概述
- 不同的Map任务之间不会进行通信
- 不同的Reduce任务之间也不会发生任何信息交换
- 用户不能显式地从一台机器向另一台机器发送消息
- 所有的数据交换都是通过MapReduce框架自身去实现的
map函数接收一个<key,value>形式的输入,然后同样产生一个<key,value>形式的中间输出,reduce函数接收一个如<key,(list of values)>形式的输入,然后对这个value集合进行处理,每个reduce产生0或1个输出,reduce的输出也是<key,value>形式的。
Split(分片)
HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。
Map任务的数量
Hadoop为每个split创建一个Map任务,split 的多少决定了Map任务的数目。大多数情况下,理想的分片大小是一个HDFS块
Reduce任务的数量
- 最优的Reduce任务个数取决于集群中可用的reduce任务槽(slot)的数目
- 通常设置比reduce任务槽数目稍微小一些的Reduce任务个数(这样可以预留一些系统资源处理可能发生的错误)
Shuffle过程
Map端的Shuffle过程
- 每个Map任务分配一个缓存
- MapReduce默认100MB缓存
- 设置溢写比例0.8
- 分区默认采用哈希函数
- 排序是默认的操作
- 排序后可以合并(Combine)
- 合并不能改变最终结果
- 在Map任务全部结束之前进行归并
- 归并得到一个大的文件,放在本地磁盘
- 文件归并时,如果溢写文件数量大于预定值(默认是3)则可以再次启动Combiner,少于3不需要
- JobTracker会一直监测Map任务的执行,并通知Reduce任务来领取数据
Reduce端的Shuffle过程 - Reduce任务通过RPC向JobTracker询问Map任务是否已经完成,若完成,则领取数据
- Reduce领取数据先放入缓存,来自不同Map机器,先归并,再合并,写入磁盘
- 多个溢写文件归并成一个或多个大文件,文件中的键值对是排序的
- 当数据很少时,不需要溢写到磁盘,直接在缓存中归并,然后输出给Reduce
第一个MapReduce项目
本地IDEA创建hadoop程序
创建Maven项目
修改pom.xml文件
添加项目依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ex</groupId>
<artifactId>MyMapReduce</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
选择Import Changes,下载依赖,过程可能需要一些时间
在src/main/resources
文件夹下创建log4j的日志配置文件
log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
修改windows主机中的hosts文件
C:\\Windows\\System32\\drivers\\etc\\hosts
hadoop节点ip根据自己情况修改
编写代码
在src/main/java
下新建一个包
编写Mapper类
WordCountMapper.java
package com.ex.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* KEYIN, map阶段输入的key的类型:LongWritable
* VALUEIN,map阶段输入value类型:Text
* KEYOUT,map阶段输出的Key类型:Text
* VALUEOUT,map阶段输出的value类型:IntWritable
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outK = new Text();
private IntWritable outV = new IntWritable(1); //map阶段不进行聚合
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
// xxxxxx xxxxxx
String line = value.toString();
// 2 切割(取决于原始数据的中间分隔符)
// xxxxxxx
// xxxxxxx
String[] words = line.split(" ");
// 3 循环写出
for (String word : words) {
// 封装outk
outK.set(word);
// 写出
context.write(outK, outV);
}
}
}
编写Reduce类
WordCountReduce.java
package com.ex.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* KEYIN, reduce阶段输入的key的类型:Text
* VALUEIN,reduce阶段输入value类型:IntWritable
* KEYOUT,reduce阶段输出的Key类型:Text
* VALUEOUT,reduce阶段输出的value类型:IntWritable
*/
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
private IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
// xxxxxxx xxxxxxx ->(xxxxxxx,1),(xxxxxxx,1)
// xxxxxxx, (1,1)
// 将values进行累加
for (IntWritable value : values) {
sum += value.get();
}
outV.set(sum);
// 写出
context.write(key,outV);
}
}
编写Driver驱动类
package com.ex.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置jar包路径
job.setJarByClass(WordCountDriver.class);
// 3 关联mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置map输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出的kV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\\\Data\\\\Hadoop\\\\inputfile"));
FileOutputFormat.setOutputPath(job, new Path("D:\\\\Data\\\\Hadoop\\\\out"));
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
本地测试
(1)需要首先配置好 HADOOP_HOME 变量以及 Windows 运行依赖
(2)在 IDEA 上运行程序
(3)在MapReduce中,如果输出路径存在会报错
运行之后,生成out目录,part-r-00000为结果文件
从IDEA中导出可执行jar包
为了方便更改输入输出路径,修改WordCountDriver.java
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
1、打开IDEA开发工具,选中我们的项目,然后依次选中
File->Project Structure…
2、创建artifact
3、编译jar程序
编译结束后,jar包可以在IDEA项目路径/out/artifacts/项目名_jar
中找到
将jar包放入集群中运行。
将inputfile放入HDFS中
运行jar
hadoop jar MyMapReduce.jar com.ex.mapreduce.wordcount.WordCountDriver input output
至此完成MapReduce初步学习
以上是关于MapReduce初识的主要内容,如果未能解决你的问题,请参考以下文章
初识Spring源码 -- doResolveDependency | findAutowireCandidates | @Order@Priority调用排序 | @Autowired注入(代码片段