MapReduce表连接操作之Reduce端join

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce表连接操作之Reduce端join相关的知识,希望对你有一定的参考价值。

一:背景

Reduce端连接比Map端连接更为普遍,因为输入的数据不需要特定的结构,但是效率比较低,因为所有数据都必须经过Shuffle过程。

 

二:技术实现

基本思路

(1):Map端读取所有的文件,并在输出的内容里加上标示,代表数据是从哪个文件里来的。

(2):在reduce处理函数中,按照标识对数据进行处理。

(3):然后根据Key去join来求出结果直接输出。

 

数据准备

准备好下面两张表:

(1):tb_a(以下简称表A)

 

[java] view plain copy
 
  1. id  name  
  2. 1   北京  
  3. 2   天津  
  4. 3   河北  
  5. 4   山西  
  6. 5   内蒙古  
  7. 6   辽宁  
  8. 7   吉林  
  9. 8   黑龙江  



 

(2):tb_b(以下简称表B)

 

[java] view plain copy
 
  1. id  statyear    num  
  2. 1   2010    1962  
  3. 1   2011    2019  
  4. 2   2010    1299  
  5. 2   2011    1355  
  6. 4   2011    3574  
  7. 4   2011    3593  
  8. 9   2010    2303  
  9. 9   2011    2347  


#需求就是以id为key做join操作(注:上面的数据都是以制表符“\t”分割)

 

 

计算模型

整个计算过程是:

(1):在Map阶段,把所有数据标记成<key,value>的形式,其中key是id,value则根据来源不同取不同的形式:来源于A的记录,value的值为"a#"+name;来源于B的记录,value的值为"b#"+score。

(2):在reduce阶段,先把每个key下的value列表拆分为分别来自表A和表B的两部分,分别放入两个向量中。然后遍历两个向量做笛卡尔积,形成一条条最终的结果。

 

如下图所示:

技术分享

代码实现如下:

 

[java] view plain copy
 
  1. public class ReduceJoinTest {  
  2.   
  3.         // 定义输入路径  
  4.         private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/table_join/tb_*";  
  5.         // 定义输出路径  
  6.         private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";  
  7.   
  8.         public static void main(String[] args) {  
  9.   
  10.             try {  
  11.                 // 创建配置信息  
  12.                 Configuration conf = new Configuration();  
  13.                   
  14.   
  15.                 // 创建文件系统  
  16.                 FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);  
  17.                 // 如果输出目录存在,我们就删除  
  18.                 if (fileSystem.exists(new Path(OUT_PATH))) {  
  19.                     fileSystem.delete(new Path(OUT_PATH), true);  
  20.                 }  
  21.   
  22.                 // 创建任务  
  23.                 Job job = new Job(conf, ReduceJoinTest.class.getName());  
  24.   
  25.                 //1.1   设置输入目录和设置输入数据格式化的类  
  26.                 FileInputFormat.setInputPaths(job, INPUT_PATH);  
  27.                 job.setInputFormatClass(TextInputFormat.class);  
  28.   
  29.                 //1.2   设置自定义Mapper类和设置map函数输出数据的key和value的类型  
  30.                 job.setMapperClass(ReduceJoinMapper.class);  
  31.                 job.setMapOutputKeyClass(Text.class);  
  32.                 job.setMapOutputValueClass(Text.class);  
  33.   
  34.                 //1.3   设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)  
  35.                 job.setPartitionerClass(HashPartitioner.class);  
  36.                 job.setNumReduceTasks(1);  
  37.   
  38.                 //1.4   排序  
  39.                 //1.5   归约  
  40.                 //2.1   Shuffle把数据从Map端拷贝到Reduce端。  
  41.                 //2.2   指定Reducer类和输出key和value的类型  
  42.                 job.setReducerClass(ReduceJoinReducer.class);  
  43.                 job.setOutputKeyClass(Text.class);  
  44.                 job.setOutputValueClass(Text.class);  
  45.   
  46.                 //2.3   指定输出的路径和设置输出的格式化类  
  47.                 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  
  48.                 job.setOutputFormatClass(TextOutputFormat.class);  
  49.   
  50.   
  51.                 // 提交作业 退出  
  52.                 System.exit(job.waitForCompletion(true) ? 0 : 1);  
  53.               
  54.             } catch (Exception e) {  
  55.                 e.printStackTrace();  
  56.             }  
  57.         }  
  58.           
  59.     public static  class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text>{  
  60.         @Override  
  61.         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {  
  62.               //获取输入文件的全路径和名称  
  63.               FileSplit fileSplit = (FileSplit) context.getInputSplit();  
  64.               String path = fileSplit.getPath().toString();  
  65.                 
  66.               //获取输入记录的字符串  
  67.               String line = value.toString();  
  68.                 
  69.               //抛弃空记录  
  70.               if (line == null || line.equals("")){  
  71.                   return;  
  72.               }  
  73.                 
  74.               //处理来自tb_a表的记录  
  75.               if (path.contains("tb_a")){  
  76.                   //按制表符切割  
  77.                   String[] values = line.split("\t");  
  78.                   //当数组长度小于2时,视为无效记录  
  79.                   if (values.length < 2){  
  80.                       return;  
  81.                   }  
  82.                   //获取id和name  
  83.                   String id = values[0];  
  84.                   String name = values[1];  
  85.                     
  86.                   //把结果写出去  
  87.                   context.write(new Text(id), new Text("a#" + name));  
  88.               } else if (path.contains("tb_b")){  
  89.                   //按制表符切割  
  90.                   String[] values = line.split("\t");  
  91.                   //当长度不为3时,视为无效记录  
  92.                   if (values.length < 3){  
  93.                       return;  
  94.                   }  
  95.                     
  96.                   //获取属性  
  97.                   String id = values[0];  
  98.                   String statyear = values[1];  
  99.                   String num = values[2];  
  100.                     
  101.                   //写出去  
  102.                   context.write(new Text(id), new Text("b#" + statyear + "  " + num));  
  103.               }  
  104.               
  105.         }  
  106.           
  107.         public static class ReduceJoinReducer extends Reducer<Text, Text, Text, Text>{  
  108.               
  109.             @Override  
  110.             protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {  
  111.                   
  112.                 //用来存放来自tb_a表的数据  
  113.                 Vector<String> vectorA = new Vector<String>();  
  114.                 //用来存放来自tb_b表的  
  115.                 Vector<String> vectorB = new Vector<String>();  
  116.                   
  117.                 //迭代集合数据  
  118.                 for (Text val : values){  
  119.                     //将集合中的数据对应添加到Vector中  
  120.                     if (val.toString().startsWith("a#")){  
  121.                         vectorA.add(val.toString().substring(2));  
  122.                     } else if (val.toString().startsWith("b#")){  
  123.                         vectorB.add(val.toString().substring(2));  
  124.                     }  
  125.                 }  
  126.                   
  127.                 //获取两个Vector集合的长度  
  128.                 int sizeA = vectorA.size();  
  129.                 int sizeB = vectorB.size();  
  130.                   
  131.                 //遍历两个向量将结果写出去  
  132.                 for (int i=0; i<sizeA; i++){  
  133.                     for (int j=0; j<sizeB; j++){  
  134.                         context.write(key, new Text("   " + vectorA.get(i) + "  " + vectorB.get(j)));  
  135.                     }  
  136.                 }  
  137.                   
  138.                   
  139.             }  
  140.         }  
  141.     }  
  142. }  


程序运行的结果:

 

技术分享

 

细节:

(1):当map读取源文件时,如何区分出是file1还是file2?

 

[java] view plain copy
 
  1. FileSplit fileSplit = (FileSplit)context.getInputSplit();  
  2.         String path =  fileSplit.getPath().toString();  

根据path就可以知道文件的来源咯。

以上是关于MapReduce表连接操作之Reduce端join的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce表连接之半连接SemiJoin

走进大数据丨 MapReduce之数据倾斜

大数据之Hadoop(MapReduce):Reduce输出端采用压缩

MapReduce-join连接

大数据之Hadoop(MapReduce):Join之Reduce Join应用

Mapreduce实例--求平均值