mapreduce join操作

Posted 呢喃的歌声

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mapreduce join操作相关的知识,希望对你有一定的参考价值。

上次和朋友讨论到mapreduce,join应该发生在map端,理由太想当然到sql里面的执行过程了 wheremap端 join在map之前(笛卡尔积),但实际上网上看了,mapreduce的笛卡尔积发生在reduce端,下面哥们有个实现过程可以参考(http://blog.csdn.net/xyilu/article/details/8996204)。有空再看看 实际上实现过程是不是和他写的代码一样。
 
 
 
 
 
 
前阵子把MapReduce实现join操作的算法设想清楚了,但一直没有在代码层面落地。今天终于费了些功夫把整个流程走了一遭,期间经历了诸多麻烦并最终得以将其一一搞定,再次深切体会到,什么叫从计算模型到算法实现还有很多路要走。
 

数据准备

首先是准备好数据。这个倒已经是一个熟练的过程,所要做的是把示例数据准备好,记住路径和字段分隔符。
准备好下面两张表:
(1)m_ys_lab_jointest_a(以下简称表A)
建表语句为:
[sql] view plain copy
 
 print?
  1. create table if not exists m_ys_lab_jointest_a (  
  2.      id bigint,  
  3.      name string  
  4. )  
  5. row format delimited  
  6. fields terminated by ‘9‘  
  7. lines terminated by ‘10‘  
  8. stored as textfile;  
数据:
id     name
1     北京
2     天津
3     河北
4     山西
5     内蒙古
6     辽宁
7     吉林
8     黑龙江
(2)m_ys_lab_jointest_b(以下简称表B)
建表语句为:
[sql] view plain copy
 
 print?
  1. create table if not exists m_ys_lab_jointest_b (  
  2.      id bigint,  
  3.      statyear bigint,  
  4.      num bigint  
  5. )  
  6. row format delimited  
  7. fields terminated by ‘9‘  
  8. lines terminated by ‘10‘  
  9. stored as textfile;  
数据:
id     statyear     num
1     2010     1962
1     2011     2019
2     2010     1299
2     2011     1355
4     2010     3574
4     2011     3593
9     2010     2303
9     2011     2347

我们的目的是,以id为key做join操作,得到以下表:
m_ys_lab_jointest_ab
id     name    statyear     num
1       北京    2011    2019
1       北京    2010    1962
2       天津    2011    1355
2       天津    2010    1299
4       山西    2011    3593
4       山西    2010    3574

计算模型

整个计算过程是:
(1)在map阶段,把所有记录标记成<key, value>的形式,其中key是id,value则根据来源不同取不同的形式:来源于表A的记录,value的值为"a#"+name;来源于表B的记录,value的值为"b#"+score。
(2)在reduce阶段,先把每个key下的value列表拆分为分别来自表A和表B的两部分,分别放入两个向量中。然后遍历两个向量做笛卡尔积,形成一条条最终结果。
如下图所示:
技术分享

代码

代码如下:
[java] view plain copy
 
 print?
  1. import java.io.IOException;  
  2. import java.util.HashMap;  
  3. import java.util.Iterator;  
  4. import java.util.Vector;  
  5.   
  6. import org.apache.hadoop.io.LongWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.io.Writable;  
  9. import org.apache.hadoop.mapred.FileSplit;  
  10. import org.apache.hadoop.mapred.JobConf;  
  11. import org.apache.hadoop.mapred.MapReduceBase;  
  12. import org.apache.hadoop.mapred.Mapper;  
  13. import org.apache.hadoop.mapred.OutputCollector;  
  14. import org.apache.hadoop.mapred.RecordWriter;  
  15. import org.apache.hadoop.mapred.Reducer;  
  16. import org.apache.hadoop.mapred.Reporter;  
  17.   
  18. /** 
  19.  * MapReduce实现Join操作 
  20.  */  
  21. public class MapRedJoin {  
  22.     public static final String DELIMITER = "\u0009"; // 字段分隔符  
  23.       
  24.     // map过程  
  25.     public static class MapClass extends MapReduceBase implements  
  26.             Mapper<LongWritable, Text, Text, Text> {  
  27.                           
  28.         public void configure(JobConf job) {  
  29.             super.configure(job);  
  30.         }  
  31.           
  32.         public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,  
  33.                 Reporter reporter) throws IOException, ClassCastException {  
  34.             // 获取输入文件的全路径和名称  
  35.             String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();  
  36.             // 获取记录字符串  
  37.             String line = value.toString();  
  38.             // 抛弃空记录  
  39.             if (line == null || line.equals("")) return;   
  40.               
  41.             // 处理来自表A的记录  
  42.             if (filePath.contains("m_ys_lab_jointest_a")) {  
  43.                 String[] values = line.split(DELIMITER); // 按分隔符分割出字段  
  44.                 if (values.length < 2) return;  
  45.                   
  46.                 String id = values[0]; // id  
  47.                 String name = values[1]; // name  
  48.                   
  49.                 output.collect(new Text(id), new Text("a#"+name));  
  50.             }  
  51.             // 处理来自表B的记录  
  52.             else if (filePath.contains("m_ys_lab_jointest_b")) {  
  53.                 String[] values = line.split(DELIMITER); // 按分隔符分割出字段  
  54.                 if (values.length < 3) return;  
  55.                   
  56.                 String id = values[0]; // id  
  57.                 String statyear = values[1]; // statyear  
  58.                 String num = values[2]; //num  
  59.                   
  60.                 output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));  
  61.             }  
  62.         }  
  63.     }  
  64.       
  65.     // reduce过程  
  66.     public static class Reduce extends MapReduceBase  
  67.             implements Reducer<Text, Text, Text, Text> {  
  68.         public void reduce(Text key, Iterator<Text> values,  
  69.                 OutputCollector<Text, Text> output, Reporter reporter)  
  70.                 throws IOException {  
  71.                       
  72.             Vector<String> vecA = new Vector<String>(); // 存放来自表A的值  
  73.             Vector<String> vecB = new Vector<String>(); // 存放来自表B的值  
  74.               
  75.             while (values.hasNext()) {  
  76.                 String value = values.next().toString();  
  77.                 if (value.startsWith("a#")) {  
  78.                     vecA.add(value.substring(2));  
  79.                 } else if (value.startsWith("b#")) {  
  80.                     vecB.add(value.substring(2));  
  81.                 }  
  82.             }  
  83.               
  84.             int sizeA = vecA.size();  
  85.             int sizeB = vecB.size();  
  86.               
  87.             // 遍历两个向量  
  88.             int i, j;  
  89.             for (i = 0; i < sizeA; i ++) {  
  90.                 for (j = 0; j < sizeB; j ++) {  
  91.                     output.collect(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j)));  
  92.                 }  
  93.             }     
  94.         }  
  95.     }  
  96.       
  97.     protected void configJob(JobConf conf) {  
  98.         conf.setMapOutputKeyClass(Text.class);  
  99.         conf.setMapOutputValueClass(Text.class);  
  100.         conf.setOutputKeyClass(Text.class);  
  101.         conf.setOutputValueClass(Text.class);  
  102.         conf.setOutputFormat(ReportOutFormat.class);  
  103.     }  
  104. }  

技术细节

下面说一下其中的若干技术细节:
(1)由于输入数据涉及两张表,我们需要判断当前处理的记录是来自表A还是来自表B。Reporter类getInputSplit()方法可以获取输入数据的路径,具体代码如下:
String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
(2)map的输出的结果,同id的所有记录(不管来自表A还是表B)都在同一个key下保存在同一个列表中,在reduce阶段需要将其拆开,保存为相当于笛卡尔积的m x n条记录。由于事先不知道m、n是多少,这里使用了两个向量(可增长数组)来分别保存来自表A和表B的记录,再用一个两层嵌套循环组织出我们需要的最终结果。
(3)在MapReduce中可以使用System.out.println()方法输出,以方便调试。不过System.out.println()的内容不会在终端显示,而是输出到了stdout和stderr这两个文件中,这两个文件位于logs/userlogs/attempt_xxx目录下。可以通过web端的历史job查看中的“Analyse This Job”来查看stdout和stderr的内容。























以上是关于mapreduce join操作的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce-join连接

使用MapReduce实现两个文件的Join操作

MapReduce实现两表的Join--原理及python和java代码实现

MapReduce实现两表的Join--原理及python和java代码实现

MapReduce实现两表的Join--原理及python和java代码实现

MapReduce Join的使用