MapReduce TopK问题实际应用

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce TopK问题实际应用相关的知识,希望对你有一定的参考价值。

一:背景

TopK问题应该是海量数据处理中应用最广泛的了,比如在海量日志数据处理中,对数据清洗完成之后统计某日访问网站次数最多的前K个IP。这个问题的实现方式并不难,我们完全可以利用MapReduce的Shuffle过程实现排序,然后在Reduce端进行简单的个数判断输出即可。这里还涉及到二次排序,不懂的同学可以参考我之前的文章。

 

二:技术实现

#我们先来看看一条Ngnix服务器的日志:

 

[java] view plain copy
 
  1. 181.133.250.74 - - [06/Jan/2015:10:18:08 +0800] "GET /lavimer/love.png HTTP/1.1" 200 968   
  2. "http://www.iteblog.com/archives/994"   
  3. "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (Khtml, like Gecko)   
  4. Chrome/34.0.1847.131 Safari/537.36"  

这条日志里面含有9列(为了展示美观,我在这里加了换行符),每列之间都是用空格分割的,每列的含义分别是:客户端IP、用户标示、用户、访问时间、请求页面、请求状态、返回文件大小、跳转来源、浏览器UA

 

 

#数据清洗这里就不说了,很简单,无非就是字符串的截取和WordCount程序。现在假设经过清洗后的数据如下(第一列是IP第二列是出现次数):

 

[java] view plain copy
 
  1. 180.173.250.74  1001  
  2. 18.13.253.64    10001  
  3. 181.17.252.175  10001  
  4. 113.172.210.174 99  
  5. 186.175.251.114 89  
  6. 10.111.220.54   900  
  7. 110.199.220.23  9999  
  8. 140.143.253.11  999999  
  9. 101.133.230.24  999999  
  10. 115.171.220.14  999999  
  11. 185.172.238.48  999888  
  12. 123.17.240.74   19000  
  13. 187.124.225.74  8777  
  14. 119.173.243.74  8888  
  15. 186.173.250.89  8888  

#现在的需求是求出访问量最高的前10个IP。

 

 

代码实现如下:

把IP和其对应的出现次数封装成一个实体对象并实现WritableComparable接口用于排序。

 

[java] view plain copy
 
  1. public class IPTimes implements WritableComparable {  
  2.     //IP  
  3.     private Text ip;  
  4.     //IP对应出现的次数  
  5.     private IntWritable count;  
  6.   
  7.     //无参构造函数(一定要有,反射机制会出错,另外要对定义的变量进行初始化否则会报空指针异常)  
  8.     public IPTimes() {  
  9.         this.ip = new Text("");  
  10.         this.count = new IntWritable(1);  
  11.     }  
  12.   
  13.     //有参构造函数  
  14.     public IPTimes(Text ip, IntWritable count) {  
  15.         this.ip = ip;  
  16.         this.count = count;  
  17.     }  
  18.   
  19.     //反序列化  
  20.     public void readFields(DataInput in) throws IOException {  
  21.         ip.readFields(in);  
  22.         count.readFields(in);  
  23.     }  
  24.     //序列化  
  25.     public void write(DataOutput out) throws IOException {  
  26.         ip.write(out);  
  27.         count.write(out);  
  28.     }  
  29.     /*两个变量的getter和setter方法*/  
  30.     public Text getIp() {  
  31.         return ip;  
  32.     }  
  33.   
  34.     public void setIp(Text ip) {  
  35.         this.ip = ip;  
  36.     }  
  37.   
  38.     public IntWritable getCount() {  
  39.         return count;  
  40.     }  
  41.   
  42.     public void setCount(IntWritable count) {  
  43.         this.count = count;  
  44.     }  
  45.   
  46.       
  47.     /** 
  48.      * 这个方法是二次排序的关键 
  49.      */  
  50.     public int compareTo(Object o) {  
  51.         //强转   
  52.         IPTimes ipAndCount = (IPTimes) o;  
  53.         //对第二列的count进行比较  
  54.         long minus = this.getCount().compareTo(ipAndCount.getCount());  
  55.           
  56.         if (minus != 0){//第二列不相同时降序排列  
  57.             return ipAndCount.getCount().compareTo(this.count);  
  58.         }else {//第二列相同时第一列升序排列  
  59.             return this.ip.compareTo(ipAndCount.getIp());  
  60.         }  
  61.           
  62.     }  
  63.   
  64.     //hashCode和equals()方法  
  65.     public int hashCode() {  
  66.         return ip.hashCode();  
  67.     }  
  68.   
  69.     public boolean equals(Object o) {  
  70.         if (!(o instanceof IPTimes))  
  71.             return false;  
  72.         IPTimes other = (IPTimes) o;  
  73.         return ip.equals(other.ip) && count.equals(other.count);  
  74.     }  
  75.   
  76.     //重写toString()方法  
  77.     public String toString() {  
  78.   
  79.         return this.ip + "\t" + this.count;  
  80.     }  
  81.   
  82. }  

主类TopK.java:

 

 

[java] view plain copy
 
  1. public class TOPK {  
  2.     // 定义输入路径  
  3.     private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/topk_file/*";  
  4.     // 定义输出路径  
  5.     private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";  
  6.   
  7.     public static void main(String[] args) {  
  8.   
  9.         try {  
  10.             // 创建配置信息  
  11.             Configuration conf = new Configuration();  
  12.   
  13.             // 创建文件系统  
  14.             FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);  
  15.             // 如果输出目录存在,我们就删除  
  16.             if (fileSystem.exists(new Path(OUT_PATH))) {  
  17.                 fileSystem.delete(new Path(OUT_PATH), true);  
  18.             }  
  19.   
  20.             // 创建任务  
  21.             Job job = new Job(conf, TOPK.class.getName());  
  22.   
  23.             //1.1 设置输入目录和设置输入数据格式化的类  
  24.             FileInputFormat.setInputPaths(job, INPUT_PATH);  
  25.             job.setInputFormatClass(TextInputFormat.class);  
  26.   
  27.             //1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型  
  28.             job.setMapperClass(TopKMapper.class);  
  29.             job.setMapOutputKeyClass(IPTimes.class);  
  30.             job.setMapOutputValueClass(Text.class);  
  31.   
  32.             //1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)  
  33.             job.setPartitionerClass(HashPartitioner.class);  
  34.             job.setNumReduceTasks(1);  
  35.   
  36.             //1.4 排序  
  37.             //1.5 归约  
  38.             //2.1 Shuffle把数据从Map端拷贝到Reduce端。  
  39.             //2.2 指定Reducer类和输出key和value的类型  
  40.             job.setReducerClass(TopkReducer.class);  
  41.             job.setOutputKeyClass(IPTimes.class);  
  42.             job.setOutputValueClass(Text.class);  
  43.   
  44.             //2.3 指定输出的路径和设置输出的格式化类  
  45.             FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  
  46.             job.setOutputFormatClass(TextOutputFormat.class);  
  47.   
  48.             // 提交作业 退出  
  49.             System.exit(job.waitForCompletion(true) ? 0 : 1);  
  50.   
  51.         } catch (Exception e) {  
  52.             e.printStackTrace();  
  53.         }  
  54.     }  
  55.   
  56.     public static class TopKMapper extends Mapper<LongWritable, Text, IPTimes, Text> {  
  57.         @Override  
  58.         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IPTimes, Text>.Context context) throws IOException, InterruptedException {  
  59.             //切分字符串  
  60.             String[] splits = value.toString().split("\t");  
  61.             // 创建IPCount对象  
  62.             IPTimes tmp = new IPTimes(new Text(splits[0]), new IntWritable(Integer.valueOf(splits[1])));  
  63.             // 把结果写出去  
  64.             context.write(tmp, new Text());  
  65.   
  66.         }  
  67.   
  68.         public static class TopkReducer extends Reducer<IPTimes, Text, IPTimes, Text> {  
  69.             //临时变量  
  70.             int counter = 0;  
  71.             //TOPK中的K  
  72.             int k = 10;  
  73.   
  74.             @Override  
  75.             protected void reduce(IPTimes key, Iterable<Text> values, Reducer<IPTimes, Text, IPTimes, Text>.Context context) throws IOException,  
  76.                     InterruptedException {  
  77.                 if (counter < k) {  
  78.                     context.write(key, null);  
  79.                     counter++;  
  80.                 }  
  81.             }  
  82.         }  
  83.     }  
  84. }  


程序运行的结果如下:

 

技术分享

 

注:网上有很多关于TOPK的求法,很多都用了TreeMap这个数据结构,但是我测试过他们写的很多程序,有一个很严重的问题就是当数字相同时记录就会被抛弃。但这是不符合实际需求的,现实中完全有可能出现两个不同的IP访问的次数相同的情况。

以上是关于MapReduce TopK问题实际应用的主要内容,如果未能解决你的问题,请参考以下文章

堆排序应用之topK问题

Hadoop- MapReduce在实际应用中常见的调优

数据结构之堆的应用—TopK问题

MapReduce 任务数

Hadoop学习笔记—12.MapReduce中的常见算法

堆排序和TopK问题