MapReduce工作流程最详细解释

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce工作流程最详细解释相关的知识,希望对你有一定的参考价值。

参考技术A

Mapreduce简介

Hadoop MapReduce 源于Google发表的 MapReduce论文。Hadoop MapReduce 其实就是Google MapReduce的一个克隆版本。Hadoop 2.0即第二代Hadoop系统,其框架最核心的设计是HDFS、MapReduce和YARN。其中,HDFS为海量数据提供存储,MapReduce用于分布式计算,YARN用于进行资源管理。

其实,一次mapreduce过程就包括上图的6个步骤,input、splitting、mapping、shuffling、redecing、final redult。

文件要存储在HDFS中,每个文件被切分成多个一定大小的块也就是Block,(Hadoop1.0默认为64M,Hadoop2.0默认为128M),并且默认3个备份存储在多个的节点中。

MR通过Inputformat将数据文件从HDFS中读入取,读取完后会对数据进行split切片,切片的数量根据Block的大小所决定,然后每一个split的个数又决定map的个数,即一个split会分配一个maptask并行实例处理。

如何确定切分的文件大小?

数据进入到map函数中,然后开始按照一定的规则切分。其实这就是我们自定义的计算逻辑,我们编写mr程序的map函数的逻辑一般就在这个阶段执行。企业应用为了追求开发效率,一般都使用hive sql代替繁琐的mr程序了,这里附上一个经典的wordcount的map函数重温一下吧。

Shuffle是我们不需要编写的模块,但却是十分关键的模块。

在map中,每个 map 函数会输出一组 key/value对, Shuffle 阶段需要从所有 map主机上把相同的 key 的 key value对组合在一起,(也就是这里省去的Combiner阶段)组合后传给 reduce主机, 作为输入进入 reduce函数里。

Partitioner组件 负责计算哪些 key 应当被放到同一个 reduce 里

HashPartitioner类,它会把 key 放进一个 hash函数里,然后得到结果。如果两个 key 的哈希值 一样,他们的 key/value对 就被放到同一个 reduce 函数里。我们也把分配到同一个 reduce函数里的 key /value对 叫做一个reduce partition.

我们看到 hash 函数最终产生多少不同的结果, 这个 Hadoop job 就会有多少个 reduce partition/reduce 函数,这些 reduce函数最终被JobTracker 分配到负责 reduce 的主机上,进行处理。

Map方法之后,数据首先进入到分区方法,把数据标记好分区,然后把数据发送到环形缓冲区;环形缓冲区默认大小100m,环形缓冲区达到80%时,进行溢写; 溢写前对数据进行排序 ,排序按照对key的索引进行字典顺序排序,排序的手段快排;溢写产生大量溢写文件,需要 对溢写文件进行归并排序 ;对溢写的文件也可以进行Combiner操作,前提是汇总操作,求平均值不行。最后将文件按照分区存储到磁盘,等待Reduce端拉取。每个Reduce拉取Map端对应分区的数据。拉取数据后先存储到内存中,内存不够了,再存储到磁盘。拉取完所有数据后, 采用归并排序将内存和磁盘中的数据都进行排序 。在进入Reduce方法前,可以对数据进行分组操作。值得注意的是, 整个shuffle操作是有3次排序的。

reduce() 函数以 key 及对应的 value 列表作为输入,按照用户自己的程序逻辑,经合并 key 相同的 value 值后,产 生另外一系列 key/value 对作为最终输出写入 HDFS。

mapreduce运行流程总结

     先上图,下图描绘了一个mapreduce程序的的一般运行过程和需要经过的几个阶段

技术分享

  大体上我们可以将mapreduce程序划分为inputformat ,map ,shuffle,reduce,outputformat五个阶段,下面我们会详细介绍各个阶段的具体的运行细节

  以最简单的wordcount程序为例,本例使用基于hadoop2.6的环境,一般的api都使用mapreudce下的,注意不要使用mapred下的api可能会引起未知错误

 

   惯例hello word程序

driver类,负责构建mapredue任务,设置job的名称,指定任务的输入文件并设置相应的读取类,map处理类,reduce处理类,输出文件路径,mapreduce程序从driver类开始,程序运行时会根据Configuration读取到配置和yarn通信,申请运行任务的资源,申请资源之后就开始将jar包发送到yarn的各个节点执行map和reudce任务

 1 package mapreduce.wordcount;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 9 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
12 
13 import java.io.IOException;
14 
15 /**
16  * Created by teaegg on 2016/11/21.
17  */
18 public class WordcountDriver {
19     public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
20         runjob(args);
21     }
22 
23 
24     public static void runjob(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
25         Configuration conf = new Configuration();//创建任务的配置对象,会自动加载hadoop的的配置文件
26         //Job job = new Job(conf);是废弃的写法,虽然依然可以使用
27         Job job  = Job.getInstance(conf);
28         job.setJobName("wordcount");//设置任务的名称
29 
30         //hadoop中对java一般的常用的数据类型做了封装形成了mapreduce专用的数据类型
31         job.setOutputKeyClass(Text.class);//Text.class对应了String类型
32         job.setOutputValueClass(IntWritable.class);//IntWritable.class对应了int类型
33 
34         job.setMapperClass(WordcountMap.class);//设置任务的map类
35         job.setReducerClass(WordcountReduce.class);//设置任务的reduce类
36 
37         job.setMapOutputKeyClass(Text.class);//设置map端输出数据类型
38         job.setMapOutputValueClass(IntWritable.class);//设置mapd端reduce的数据类型
39         //注意以上都要和map的write方法写出类型一致
40 
41 
42         //1. 这里不设置TextInputFormat.class也没有关系,hadoop默认调用TextInputFormat类处理,
43         //2. 注意TextInputFormat类不能用来读取Sequncefile类型文件
44         job.setInputFormatClass(TextInputFormat.class);//设置输入文件读取的类,可以hdfs上处理一般的文本文件
45         job.setOutputFormatClass(TextOutputFormat.class);//设置输出的文件类型
46 
47         FileInputFormat.addInputPath(job, new Path(args[0]));//设置hdfs上输入文件路径,这里可以传入多个文件路径
48         // 可以再添加一个输入文件路径
49         //如 FileInputFormat.addInputPath(job, new Path(" "));
50         FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置hdfs上的输出文件路径,
51 
52         job.waitForCompletion(true);
53     }
54 
55 }

 

 

 

 map类,其实再map类执行之前还有一个过程inputformat,这里没有详细介绍,inputformat过程负责将输入路径传入的文件做分片,每一个分片会生成一个map任务,并且会读取输入的文件分片,并返回一行记录recordreader对象交给map方法执行,就这样不断的生成recordreader对象交给map去执行,recordreader对应的就是map方法中解析的kv键值对

 1 package mapreduce.wordcount;
 2 
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.LongWritable;
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapreduce.Mapper;
 7 
 8 import java.io.IOException;
 9 
10 /**
11  * Created by teaegg on 2016/11/21.
12  */
13 public class WordcountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
14 
15     private Text outkey;
16 
17     private IntWritable outvalue = new IntWritable(1);
18 
19     public void map(LongWritable key, Text value, Context context)
20             throws IOException, InterruptedException {
21 
22         //LongWritable key是这一行记录相对文件的偏移量,一般情况用不上,可以忽视
23         //inputformat过程默认采用Textinputformat类来读取hdfs上一般的文本文件,并将每一行记录转化成Text对象
24         //这是inputformat过程的一个主要职责,读取数据并交给map来处理,
25 
26         String line = value.toString();//toString方法可以将Text类型的数据转换成String
27         String[] item = line.split(" ");//采用空格分隔每行数据
28         for (String str: item) {
29             outkey = new Text(str);    //将每一个word再转化成Text类型
30 
31             //1. 这一步是关键一步,无论是inputformat还是map或者reduce,其处理的数据都是键值对类型
32             //2. 这里调用context.write方法,将map处理好的结果写出到磁盘上,然后数据会根据key做排序,shuffle并最终
33             //到达reduce端交给reduce方法继续处理
34             //3. map端可以多次调用write方法,每次调用都是一个写出的键值对结果,
35             context.write(outkey, outvalue);
36         }
37     }
38 
39     }

 

reduce类,reduce方法每次调用的时候会处理一个map输出的key和这个key下所有的输出的value值,

 1 package mapreduce.wordcount;
 2 
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Reducer;
 6 
 7 import java.io.IOException;
 8 
 9 /**
10  * Created by teaegg on 2016/11/21.
11  * <p>
12  * 1. map端传入的是“一行”数据,而在reduce中,输入的key  即Text key是map端输出的context.write(outkey, outvalue)中设置的key
13  * 2. 而reduce方法中输入参数Iterable<IntWritable> values  是map端输出键值对相同的key下所有的value的集合
14  */
15 public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
16 
17     public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
18         int sum = 0;
19         for (IntWritable val : values) {
20             sum += val.get();
21         }
22         context.write(key, new IntWritable(sum));
23     }
24 }

 

以上是关于MapReduce工作流程最详细解释的主要内容,如果未能解决你的问题,请参考以下文章

mapreduce运行流程总结

MapReduce详细解析完整流程

Hadoop MapReduce 1.x 工作原理

Yarn流程Yarn与MapReduce 1相比

Hadoop MapReduce 1.x 工作原理

MapReduce与Yarn 的详细工作流程分析