第九篇博客 主要是MapReduce
Posted 奋斗猿666
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第九篇博客 主要是MapReduce相关的知识,希望对你有一定的参考价值。
MapReduce的处理过程分为两个步骤:map和reduce。每个阶段的输入输出都是key-value的形式,key和value的类型可以自行指定。map阶段对切分好的数据进行并行处理,处理结果传输给reduce,由reduce函数完成最后的汇总。
例如从大量历史数据中找出往年最高气温,NCDC公开了过去每一年的所有气温等天气数据的检测,每一行记录一条观测记录
为了使用MapReduce找出历史上每年的最高温度,我们将行数作为map输入的key,每一行的文本作为map输入的value:
map函数对每一行记录进行处理,提取出(年份,温度)形式的键值对,作为map的输出:
(1950,0)
(1950,22)
(1950,-11)
(1949,111)
(1947,78)
reduce收到的数据格式像这样:
(1949,[111,78]
(1950,[0,22,-11]
如果有多个map任务同时运行(通常都是这样),那么每个map任务完成后,都会向reduce发送上面格式的数据,发送数据的过程叫shuffle。
最后reduce输出即为每年最高气温:
(1949,111)
(1950,22)
在Hadoop中,map和reduce的操作可以由多种语言来编写,例如Java、Python、Ruby等。
假设现在有5年(2011-2015)的天气数据,分布存放在3个文件中: weather1.txt,weather2.txt,weather3.txt。再假设我们现在有一个3台机器的集群,并且map任务实例数量为3,reduce实例数量2。
机器一 map1 reduce1
机器二 map2 reduce2
机器三 map3
首先将本地的文件上传到HDFS:
hadoop fs -copyFromLocal /home/data/hadoop_book_input/ hdfs://master:9000/input
public void run(Context context) throws IOException, InterruptedException
setup(context);
try
// 遍历分配给该任务的数据,循环调用map
while (context.nextKeyValue())
map(context.getCurrentKey(), context.getCurrentValue(), context);
finally
cleanup(context);
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
// 行作为输入值 key在这里暂时不需要使用
String line = value.toString();
// 提取年份
String year = line.substring(15, 19);
// 提取气温
int airTemperature = parseTemperature( line );
String quality = line.substring(92, 93);
// 过滤脏数据
boolean isRecordClean = airTemperature != MISSING && quality.matches("[01459]");
if ( isRecordClean )
context.write(new Text(year), new IntWritable(airTemperature));
接着实现Reducer:
public void run(Context context) throws IOException, InterruptedException
setup(context);
try
while (context.nextKey())
reduce(context.getCurrentKey(), context.getValues(), context);
// 一个key处理完要转向下一个key时,重置值遍历器
Iterator iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator)
((ReduceContext.ValueIterator)iter).resetBackupStore();
finally
cleanup(context);
我们的Reducer实现主要是找出最高气温:
public void reduce(Text key, Iterable values,
Context context)
throws IOException, InterruptedException
int maxValue = findMax( values );
context.write(key, new IntWritable(maxValue));
private static int findMax(Iterable values)
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values)
maxValue = Math.max(maxValue, value.get());
return maxValue;
Mapper和Reducer实现后,需要一个入口提交作业到Hadoop集群,使用YARN框架来运行MapReduce作业。作业配置如下:
public class MaxTemperature
public static void main(String[] args) throws Exception
if (args.length != 2)
System.err.println("Usage: MaxTemperature ");
System.exit(-1);
// 设置jar包及作业名称
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName(“Max temperature”);
// 输入输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置Mapper和Reducer实现
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
// 设置输出格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 等待作业完成后退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
hadoop MaxTemperature /input/ncdc/sample.txt /output
日志中可以看到作业的一些运行情况,例如map任务数量,reduce任务数量,以及输入输出的记录数,可以看到跟实际情况完全吻合。
一个MapReduce作业通常包括输入数据、MapReduce程序以及一些配置信息。Hadoop把作业分解为task运行,task分为map任务和reduce任务,在新版本的Hadoop中,这些Task通过资源管理框架进行调度,如果任务失败,MapReduce应用框架会重新运行任务。
作业的输入被华为为固定大小的分片,叫input splits,简称splits。然后为每一个split分块创建一个map任务,map任务对每一条记录运行用户定义的map函数。划分为split之后,不同配置的机器就可以根据自己的资源及运算能力运行适当的任务,即使是相同配置的机器,最后运行的任务数也往往不等,这样能有效利用整个集群的计算能力。但是split也不已太多,否则会耗费很多时间在创建map任务上,通常而言,按集群Block大小(默认为128M)来划分split是合理的。
Hadoop会把map任务运行在里数据最近的节点上,最好的情况是直接在数据(split)所在的节点上运行map任务,这样不需要占用带宽,这一优化叫做数据本地优化。节点距离不是分配task考虑的唯一因素,还会考虑节点当前负载等因素。reduce针对每一个key运行reduce函数之后,输出结果通常保存在HDFS中,并且存储一定的副本数,第一个副本存在运行reduce任务的本地机器,其他副本根据HDFS写入的管道分别写入节点。
如果有多个reduce任务,那么map任务的输出到底该传输到哪一个reduce任务呢?过程叫partition。默认情况下,MapReduce使用key的哈希函数进行分桶。如果需要自行指定分区函数,可以自己实现一个Partitioner并配置到作业中。key相同的map任务输出一定会发送到同一个reduce任务。map任务的输出数据传输到reduce任务所在节点的过程,叫做shuffle。
当然,有些作业中我们可能根本不需要有reduce任务,所有工作在map任务并行执行完之后就完毕了,例如Hadoop提供的并行复制工作distcp,其内部实现就是采用一个只有Mapper,没有Reducer的MapReduce作业,在map完成文件复制之后作业就完成了
可以在map任务所在的节点上做更多工作。map任务运行完之后,可以把所有结果按年份分组,并统计出每一年的最高温度这个最高温度是局部的,只在本任务重产生的数据做比较。做完局部统计之后,将结果发送给reduce做最终的汇总,找出全局最高温度
上述的局部计算在Hadoop中使用Combiner来表示。为了在作业中使用Combiner,我们需要明确指定,在前面的例子中,可以直接使用Reducer作为Combiner,因为两者逻辑是一样的:
job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
以上是关于第九篇博客 主要是MapReduce的主要内容,如果未能解决你的问题,请参考以下文章
第九篇:Map/Reduce 工作机制分析 - 数据的流向分析