MapReduce经典案例—TopN

Posted 一指流沙q

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce经典案例—TopN相关的知识,希望对你有一定的参考价值。

目录

一、问题介绍

(一)案例分析

1. TopN分析法介绍

2. 案例需求及分析

(二)案例实现

1. Map阶段实现

2. Reduce阶段实现

3. Driver程序主类实现

4. 效果测试

二、完整代码

num.txt

1、TopNMapper.java

2、 TopNReducer.java

3、TopNDriver.java

三、运行结果 


一、问题介绍

(一)案例分析

1. TopN分析法介绍

TopN分析法是指从研究对象中按照某一个指标进行倒序或正序排列,取其中所需的N个数据,并对这N个数据进行重点分析的方法。

2. 案例需求及分析

先假设有数据文件num.txt,现要求使用MapReduce技术提取上述文本中最大的5个数据,并最终将结果汇总到一个文件中。

(1) 先设置MapReduce分区为1,即ReduceTask个数一定只有一个。我们需要提取TopN,即全局的前N条数据,不管中间有几个Map、Reduce,最终只能有一个用来汇总数据。

(2) 在Map阶段,使用TreeMap数据结构保存TopN的数据,TreeMap默认会根据其键的自然顺序进行排序,也可根据创建映射时提供的 Comparator进行排序,其firstKey()方法用于返回当前集合最小值的键。

(3) 在Reduce阶段,将Map阶段输出数据进行汇总,选出其中的TopN数据,即可满足需求。这里需要注意的是,TreeMap默认采取正序排列,需求是提取5个最大的数据,因此要重写Comparator类的排序方法进行倒序排序。

(二)案例实现

1. Map阶段实现

使用Eclipse开发工具打开之前创建的Maven项目HadoopDemo,并且新建cn.itcast.mr.topN包,在该路径下编写自定义Mapper类TopNMapper,主要用于将文件中的每行数据进行切割提取,并把数据保存到TreeMap中,判断TreeMap是否大于5,如果大于5就需要移除最小的数据。TreeMap保存了当前文件最大5条数据后,再输出到Reduce阶段。

2. Reduce阶段实现

根据Map阶段的输出结果形式,同样在cn.itcast.mr.topN包下,自定义Reducer类TopNReducer,主要用于编写TreeMap自定义排序规则,当需求取最大值时,只需要在compare()方法中返回正数即可满足倒序排列,reduce()方法依然是要满足时刻判断TreeMap中存放数据是前五个数,并最终遍历输出最大的5个数。

3. Driver程序主类实现

编写MapReduce程序运行主类TopNDriver,主要用于对指定的本地D:\\\\topN\\\\input目录下的源文件(需要提前准备)实现TopN分析,得到文件中最大的5个数,并将结果输入到本地D:\\\\topN\\\\output目录下。

4. 效果测试

为了保证MapReduce程序正常执行,需要先在本地D:\\\\topN\\\\input目录下创建文件num.txt;然后,执行MapReduce程序的程序入口TopNDriver类,正常执行完成后,在指定的D:\\\\topN\\\\output目录下生成结果文件。

二、完整代码

num.txt

10 3 8 7 6 5 1 2 9 4
11 12 17 14 15 20
19 18 13 16

1、TopNMapper.java

package cn.itcast.mr.topN;

import java.io.IOException;
import java.util.TreeMap;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TopNMapper extends Mapper<LongWritable, Text, NullWritable, IntWritable>
	private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>();
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, IntWritable>.Context context)throws IOException, InterruptedException 
		String line = value.toString();
		String[] nums = line.split(" ");
		
		for (String num :nums) 
			repToRecordMap.put(Integer.parseInt(num),"");
			if(repToRecordMap.size()>5) 
				repToRecordMap.remove(repToRecordMap.firstKey());
			
		
	
	     protected void cleanup (Mapper<LongWritable, Text, NullWritable, IntWritable>.Context context)   
	            for(Integer i : repToRecordMap.keySet())  
	            	try 
	            		context.write(NullWritable.get(), new IntWritable(i)); 
	            	catch(Exception e) 
	            		e.printStackTrace();
	            	
	                 
	              
	          



2、 TopNReducer.java

package cn.itcast.mr.topN;

import java.io.IOException;
import java.util.Comparator;
import java.util.TreeMap;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class TopNReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> 
	private TreeMap<Integer, String> repToRecordMap = new TreeMap<Integer, String>(new Comparator<Integer>() 
	public int compare(Integer a, Integer b) 
    	  return b-a;
      
	);
      @Override  
      public void reduce(NullWritable key, Iterable<IntWritable> values, Reducer<NullWritable, IntWritable, NullWritable, IntWritable>.Context context) throws IOException, InterruptedException     
         
          for(IntWritable value :values) 
        	  repToRecordMap.put(value.get(),"");
        	  if(repToRecordMap.size()>5) 
        		  repToRecordMap.remove(repToRecordMap.firstKey());
        	  
          
          
          for(Integer i:repToRecordMap.keySet()) 
        	  context.write(NullWritable.get(), new IntWritable(i));
          
        
      

3、TopNDriver.java

package cn.itcast.mr.topN;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;


public class TopNDriver 
	 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
	      

	        Configuration conf = new Configuration();
	        Job job = Job.getInstance(conf);
	        
	        job.setJarByClass(TopNDriver.class);
	        job.setMapperClass(TopNMapper.class);
	        job.setReducerClass(TopNReducer.class);
	        //设置输出类型
	        job.setOutputKeyClass(NullWritable.class);
	        job.setOutputValueClass(IntWritable.class);
	        //设置输入和输出目录
	        FileInputFormat.addInputPath(job, new Path("F:\\\\topN\\\\input"));
	        FileOutputFormat.setOutputPath(job, new Path("F:\\\\topN\\\\output"));
	 
	        System.exit(job.waitForCompletion(true) ? 0 : 1);
	    


三、运行结果 

以上是关于MapReduce经典案例—TopN的主要内容,如果未能解决你的问题,请参考以下文章

大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化(重中之重)+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客

大数据讲课笔记5.7 MR案例—TopN

大数据之Hadoop(MapReduce):MapReduce扩展案例

MapReduce程序之TopN问题(排行榜问题)

MapReduce编程初步(WordCount,TopN)

Hadoop学习之十MapReduce案例分析二-好友推荐