Hadoop集群-hadoop运行原理解析
Posted zhang_xinxiu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop集群-hadoop运行原理解析相关的知识,希望对你有一定的参考价值。
hadoop运行原理解析
前言
前两篇博客讨论了hadoop集群的搭建过程,并介绍了几个搭建过程中可能遇到的问题,提出了解决的办法,今天就来简单说说hadoop的运行过程。
【Hadoop集群】-集群搭建踩的那些坑之hadoop篇
【Hadoop集群】-集群搭建踩的那些坑之ssh篇
基本结构
hadoop提供了分布式集群的框架,可以高效的运行在低廉的机器上,用于大数据的处理分析,提高了机器的吞吐量。
hadoop核心主要由两部分组成分别是MapReduce、HDFS。MapReduce主要是负责分布式数据计算的核心,在使用hadoop时编写程序来实现对应的Map接口和Reduce接口,并调用hadoop的集群驱动来启动并行计算。HDFS是hadoop提供的分布式的数据存储,将数据分成块存储到集群的数据节点中,使用方法和命令类似于Linux的文件存储命令。接下来详细介绍这两部分。
MapReduce
MapReduce,是hadoop中集群数据处理的核心,主要分为映射和减速两部分,映射就是Map部分,减速就是说的Reduce阶段。
- Map,就是映射阶段,在数据处理的初期,hadoop会从HDFS中获取要处理的数据,一般处理的数据是文件或者目录,并逐行或逐个的读取,可以通过编程实现Map接口来自己处理映射阶段的数据处理。
- Reduce,减速阶段,分为Shuffling和Reducer,主要是处理Map阶段后的数据,处理完成后,产生新的输出,输出到HDFS中,可以通过实现Reduce接口来处理减速器数据的输出。
在MapReduce阶段,输入的值都是键-值对的形式,在实际开发中,需要实现Map和Reduce接口中Map和Reduce方法。整个过程需要经过三个阶段,使用如下的示例来说明:
Mapper任务(分割及映射)
输入拆分-Input Splits
映射的第一阶段是将数据进行拆分,如上输入了一个文本,hadoop会把数据分成若干个大小相同的数据块,这些数据块被称为Input Splits,hadoop会根据生成的若干个数据块来生成同样个数的映射任务,可以通过Job.setInputFormat(Class)指定要输入的数据块的格式。
映射块-Mapping
会将Input Splits映射为中间键值对。映射器会根据每个输入块生成一个对应的映射任务,被映射器处理后输出的数据和映射之前的数据块的格式可以是不同的,当然在映射结束后hadoop也允许0个映射输出。
这一阶段会将每个分割传入到映射函数(map)计算出输出内容,可以通过Job.setMapperClass(Class)方法来指定映射处理器。
经过映射处理后hadoop会根据输出的键来对数据进行联合,hadoop的联合阶段会很麻烦,而且消耗很大,所以可以通过使用Job.setCombinerClass(Class)自己定制联合,以此来节省联合阶段的消耗。联合后会被分组,然后传给减速器来进行最后的输出,可以通过Job.setGroupingComparatorClass(Class)方法来指定数据在分组时的处理过程。
mapping阶段最后的输出一般是很简单的键值对,hadoop也提供了数据压缩,主要是用在数据量过大或者数据格式会影响应用程序等的情况。通过实现CompressionCodec接口,自己指定压缩格式,并通过hadoop的配置来配置相应的压缩格式。
在上面的例子中会计算输入分割单词出现的次数,然后排列成数据列表<单词,出现频率>作为输出。
Note:数据最终的映射个数通常是有输入的大小来决定的。正常的映射个数是每个节点映射器量10-100个,最好的情况是每个映射器都在非常快的时间里处理完数据。尽管如此,但是在处理大数据量时可能会需要非常多的映射个数,比如一个10TB的数据作为输入,但是每个映射块最大能处理128MB,这时会需要82,000个映射。在遇到这种情况时可以通过Configuration.set(MRJobConfig.NUM_MAPS, int),来指定映射个数。
Reducer任务(重排,还原)
经过映射后的输出数据会被排序,然后每个映射器会进行分区。总共的分区数量和映射任务的数量是相同的。可以通过实现自定义的Partitioner来指定哪些数据进入哪个Reducer。
Shuffle
对映射后的数据进行排序,然后输入到Reducer。这个阶段hadoop会根据Http协议来接收到所有映射相关的分区。
Sort
hadoop会通过输入的键对所有的Reducer进行分组。Shuffle和Sort阶段是同时进行的,在map数据输出时Reducer就会不停的接收。hadoop会提供异步任务不停的从Map输出到获取到数据,如下图的任务调度器。
Secondary Sort
在对中间键值进行分组时的规则如果不同于在Reducing阶段的,则可以通过Job.setSortComparatorClass(Class)指定比较器。由于Job.setGroupingComparatorClass(Class)可用于控制中间键的分组方式,因此可以结合使用这些键来模拟值的二级排序。
减速-Reducer
在这个阶段会调用reduce方法来对排序后的输出进行处理,这个阶段结束后会将数据写入到HDFS中,并返回一个输出值。
减速器的个数大概是0.95或1.75乘以(< 节点数 > * < 每个节点的最大容器数 >)。正常情况下0.95就已经够用了,对数据会进行非常快的处理。如果考虑到负载集群的执行效率,那么可以考虑1.75,它会充分发挥负载集群的效率。
增加减速器的数量会增加框架的开销,但是会增加负载均衡并降低故障的成本。
当然处理的过程中也允许有0个减速输出。这时映射任务的输出直接会直接进入FileSystem,进入FileOutputFormat.setOutputPath(Job,Path)设置的输出路径。在将映射输出写入FileSystem之前,框架不会对映射输出进行排序。
MapReduce任务
上面已经论述过完成的工作流程(执行Map和Reduce任务)是由两种类型的实体进行控制的,分别为:Job Tracker和Task Tracker。
- Job Tracker,主节点的工作调度器,在进行数据处理时主节点会生成一个新的工作调度。
- Task Tracker,数据节点的任务调度器,它会监听数据节点的Map和Reduce的状态,并周期性的报告给JobTracker。
- 新的数据分析任务被提交后,会在主节点上生成一个作业,随后该作业会生成多个任务,这些任务会在集群中的数据节点上运行。
- JobTracker是监听任务在不同节点的运行情况
- 在每个数据节点上执行工作时,是由TaskTracker来处理的,他会监听Map和Reduce任务,并将结果发送给JobTracker,它会周期性的发送信息给JobTracker通知主节点当前任务的执行状态。
- 这样 JobTracker 就可以跟踪每项工作的总体进度。在任务失败的情况下,JobTracker 可以在不同的 TaskTracker 重新调度它。
Hadoop应用示例
上面讨论了Hadoop的工作原理,Hadoop在工作时主要分为两个部分的任务,分别是Map和Reduce,Map过程是将外部数据映射为Hadoop程序的输入,映射完成后Reduce会通过任务来获取到Map的输出作为Reduce的输入,Reduce完成的主要是数据的汇总加工,并将最终的输出转化为FileSystem存入到Hadoop的HDFS中。这期间的工作流程都是可以通过实现接口或者重写方法来自定义转换过程,通过在启动的时候指定给要指定的工作任务即可。
下面来看个数据统计的实例,计算输入文本中的单词个数。输入是一个商品销售的分布数据,包含了产品和地区时间等,接下来统计下城市出现的次数。
Mapper类代码
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapreduce.*;
import java.io.IOException;
import java.util.StringTokenizer;
/**
* Created by user on 18/12/11.
*/
public class SalesMapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>
public final static IntWritable one=new IntWritable(1);
private Text word=new Text();
public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException
String input=text.toString();
StringTokenizer stringTokenizer=new StringTokenizer(input,",");
int i=0;
while (stringTokenizer.hasMoreTokens())
if (i==6)
outputCollector.collect(new Text(stringTokenizer.nextToken()),one);
i++;
Mapper中输入是一个Text字符串,是逐行从文件中读取的,然后将map后的数据按照<城市,出现频率>生成一个集合输出给Reducer处理。
Reducer类代码
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import java.io.IOException;
import java.util.Iterator;
/**
* Created by user on 18/12/11.
*/
public class SalesCountryReducer extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable>
public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException
int frequent=0;
Text key=text;
while (iterator.hasNext())
IntWritable intWritable=iterator.next();
frequent+=1;
outputCollector.collect(text,new IntWritable(frequent));
然后从Mapper的输出中对同一个City出现的频率进行汇总,将相同的City出现的总个数汇总成为一个集合<城市,总出现个数>输出。
Driver类代码
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import java.io.IOException;
/**
* Created by user on 18/12/11.
*/
public class SalesCountryDriver
public static void main(String args[])
JobClient jobClient=new JobClient();
JobConf jobConf=new JobConf(SalesCountryDriver.class);
jobConf.setMapperClass(SalesMapper.class);
jobConf.setReducerClass(SalesCountryReducer.class);
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(jobConf,new Path(args[0]));
FileOutputFormat.setOutputPath(jobConf,new Path(args[1]));
jobClient.setConf(jobConf);
try
JobClient.runJob(jobConf);
catch (IOException e)
e.printStackTrace();
汇总后的数据会写入到hadoop中的HDFS中,可以通过在hadoop的网站上下载,并查看输出的结果。
以上是关于Hadoop集群-hadoop运行原理解析的主要内容,如果未能解决你的问题,请参考以下文章