MapReduce表连接之半连接SemiJoin

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce表连接之半连接SemiJoin相关的知识,希望对你有一定的参考价值。

一:背景

SemiJoin,一般称为半连接,其原理是在Map端过滤掉一些不需要join的数据,从而大大减少了reduce和Shuffle的时间,因为我们知道,如果仅仅使用Reduce端连接,那么如果一份数据,存在大量的无效数据,而这些数据在join中并不需要,但是因为没有做过预处理,所以这些数据直到真正执行reduce函数时,才被定义为无效数据,但是这个时候已经执行过了Shuffle、merge还有sort操作,所以这部分无效的数据就浪费了大量的网络IO和磁盘IO,所以在整体来讲,这是一种降低性能的表现,如果存在的无效数据越多,那么这种趋势就越明显。之所以会出现半连接,这其实是reduce端连接的一个变种,只不过是我们在Map端过滤掉了一些无效的数据,所以减少了reduce过程的Shuffle时间,所以能获取一个性能的提升。

 

二:技术实现

(1):利用DistributedCache将小表分发到各个节点上,在Map过程的setup()函数里,读取缓存里的文件,只将小表的连接键存储在hashSet中。

(2):在map()函数执行时,对每一条数据进行判断,如果这条数据的连接键为空或者在hashSet里不存在,那么则认为这条数据无效,使条数据也不参与reduce的过程。

注:从以上步骤就可以发现,这种做法很明显可以提升join性能,但是要注意的是小表的key如果非常大的话,可能会出现OOM的情况,这时我们就需要考虑其他的连接方式了。

 

测试数据如下:

/semi_jon/a.txt:

 

[java] view plain copy
 
  1. 1,三劫散仙,13575468248   
  2. 2,凤舞九天,18965235874   
  3. 3,忙忙碌碌,15986854789   
  4. 4,少林寺方丈,15698745862  


/semi_join/b.txt:

 

 

[java] view plain copy
 
  1. 3,A,99,2013-03-05   
  2. 1,B,89,2013-02-05   
  3. 2,C,69,2013-03-09   
  4. 3,D,56,2013-06-07   
  5. 5,E,100,2013-09-09   
  6. 6,H,200,2014-01-10  


#需求就是对上面两个表做半连接。

 

 

实现代码如下:

 

[java] view plain copy
 
  1. public class SemiJoin {  
  2.     // 定义输入路径  
  3.     private static  String INPUT_PATH1 = "";  
  4.     private static  String INPUT_PATH2 = "";  
  5.     // 定义输出路径  
  6.     private static  String OUT_PATH = "";  
  7.   
  8.     public static void main(String[] args) {  
  9.   
  10.         try {  
  11.             // 创建配置信息  
  12.             Configuration conf = new Configuration();  
  13.             // 获取命令行的参数  
  14.             String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
  15.             // 当参数违法时,中断程序  
  16.             if (otherArgs.length != 3) {  
  17.                 System.err.println("Usage:Semi_join<in1> <in2> <out>");  
  18.                 System.exit(1);  
  19.             }  
  20.   
  21.             // 给路径赋值  
  22.             INPUT_PATH1 = otherArgs[0];  
  23.             INPUT_PATH2 = otherArgs[1];  
  24.             OUT_PATH = otherArgs[2];  
  25.               
  26.             // 把小表添加到共享Cache里  
  27.             DistributedCache.addCacheFile(new URI(INPUT_PATH1), conf);  
  28.   
  29.             // 创建文件系统  
  30.             FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);  
  31.             // 如果输出目录存在,我们就删除  
  32.             if (fileSystem.exists(new Path(OUT_PATH))) {  
  33.                 fileSystem.delete(new Path(OUT_PATH), true);  
  34.             }  
  35.   
  36.             // 创建任务  
  37.             Job job = new Job(conf, SemiJoin.class.getName());  
  38.   
  39.             // 设置成jar包  
  40.             job.setJarByClass(SemiJoin.class);  
  41.   
  42.             //1.1 设置输入目录和设置输入数据格式化的类  
  43.             FileInputFormat.setInputPaths(job, INPUT_PATH2);  
  44.             job.setInputFormatClass(TextInputFormat.class);  
  45.   
  46.             //1.2设置自定义Mapper类和设置map函数输出数据的key和value的类型  
  47.             job.setMapperClass(SemiJoinMapper.class);  
  48.             job.setMapOutputKeyClass(Text.class);  
  49.             job.setMapOutputValueClass(CombineEntity.class);  
  50.   
  51.             //1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)  
  52.             job.setPartitionerClass(HashPartitioner.class);  
  53.             job.setNumReduceTasks(1);  
  54.   
  55.             //1.4 排序  
  56.             //1.5 归约  
  57.             //2.1 Shuffle把数据从Map端拷贝到Reduce端。  
  58.             //2.2 指定Reducer类和输出key和value的类型  
  59.             job.setReducerClass(SemiJoinReducer.class);  
  60.             job.setOutputKeyClass(Text.class);  
  61.             job.setOutputValueClass(Text.class);  
  62.   
  63.             //2.3 指定输出的路径和设置输出的格式化类  
  64.             FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  
  65.             job.setOutputFormatClass(TextOutputFormat.class);  
  66.   
  67.             // 提交作业 退出  
  68.             System.exit(job.waitForCompletion(true) ? 0 : 1);  
  69.   
  70.         } catch (Exception e) {  
  71.             e.printStackTrace();  
  72.         }  
  73.     }  
  74.   
  75.     /** 
  76.      * 自定义Mapper函数 
  77.      *  
  78.      * @author 廖*民 time : 2015年1月21日下午8:40:43 
  79.      * @version 
  80.      */  
  81.     public static class SemiJoinMapper extends Mapper<LongWritable, Text, Text, CombineEntity> {  
  82.         // 创建相关对象  
  83.         private CombineEntity combine = new CombineEntity();  
  84.         private Text flag = new Text();  
  85.         private Text joinKey = new Text();  
  86.         private Text secondPart = new Text();  
  87.         // 存储小表的key  
  88.         private HashSet<String> joinKeySet = new HashSet<String>();  
  89.   
  90.         @Override  
  91.         protected void setup(Mapper<LongWritable, Text, Text, CombineEntity>.Context context) throws IOException, InterruptedException {  
  92.             // 读取文件流  
  93.             BufferedReader br = null;  
  94.             String temp = "";  
  95.             // 获取DistributedCached里面的共享文件  
  96.             Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());  
  97.   
  98.             System.out.println("=================================>"+paths.length);  
  99.             // 遍历path数组  
  100.             for (Path path : paths) {  
  101.                 if (path.getName().endsWith("a.txt")) {  
  102.                     // 创建读取文件流  
  103.                     br = new BufferedReader(new FileReader(path.toString()));  
  104.   
  105.                     // 读取数据  
  106.                     while ((temp = br.readLine()) != null) {  
  107.                         // 按","切割  
  108.                         String[] splits = temp.split(",");  
  109.                         // 将key加入小表中  
  110.                         joinKeySet.add(splits[0]);  
  111.                     }  
  112.                 }  
  113.             }  
  114.   
  115.         }  
  116.   
  117.         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, CombineEntity>.Context context) throws IOException,  
  118.                 InterruptedException {  
  119.             // 获取文件输入路径  
  120.             String pathName = ((FileSplit) (context.getInputSplit())).getPath().toString();  
  121.               
  122.             System.out.println("Map中获取的路径没有a.txt吧?"+pathName);  
  123.               
  124.             if (pathName.endsWith("a.txt")) {  
  125.                 String[] valuesTemps = value.toString().split(",");  
  126.                   
  127.                 System.out.println("进入a.txt==================>a中的字符串:"+value.toString());  
  128.                 // 在这里过滤必须要连接的字符  
  129.                 if (joinKeySet.contains(valuesTemps[0])) {  
  130.                     // 设置标志位  
  131.                     flag.set("0");  
  132.                     // 设置连接键  
  133.                     joinKey.set(valuesTemps[0]);  
  134.                     // 设置第二部分  
  135.                     secondPart.set(valuesTemps[1] + "\t" + valuesTemps[1]);  
  136.                     // 封装实体  
  137.                     combine.setFlag(flag);  
  138.                     combine.setJoinKey(joinKey);  
  139.                     combine.setSecondPart(secondPart);  
  140.   
  141.                     // 写出去  
  142.                     context.write(combine.getJoinKey(), combine);  
  143.                 } else {  
  144.                     System.out.println("a.txt里");  
  145.                     System.out.println("小表中没有此记录");  
  146.                     for (String v : valuesTemps) {  
  147.                         System.out.println(v + " ");  
  148.                     }  
  149.   
  150.                     return;  
  151.                 }  
  152.             } else if (pathName.endsWith("b.txt")) {  
  153.                   
  154.                 System.out.println("进入b.txt==================>b中的字符串:"+value.toString());  
  155.                 // 切割  
  156.                 String[] valueItems = value.toString().split(",");  
  157.                 // 判断是否在集合中  
  158.                 if (joinKeySet.contains(valueItems[0])) {  
  159.                     // 设置标志位  
  160.                     flag.set("1");  
  161.                     // 设置连接键  
  162.                     joinKey.set(valueItems[0]);  
  163.                     // 设置第二部分数据,注意:不同文件的列数不一样  
  164.                     secondPart.set(valueItems[1] + "\t" + valueItems[2] + "\t" + valueItems[3]);  
  165.   
  166.                     // 封装实体  
  167.                     combine.setFlag(flag);  
  168.                     combine.setJoinKey(joinKey);  
  169.                     combine.setSecondPart(secondPart);  
  170.   
  171.                     // 写出去  
  172.                     context.write(combine.getJoinKey(), combine);  
  173.                 } else {  
  174.                     System.out.println("b.txt里");  
  175.                     System.out.println("小表中没有此记录");  
  176.                     for (String v : valueItems) {  
  177.                         System.out.println(v + " ");  
  178.                     }  
  179.   
  180.                     return;  
  181.                 }  
  182.   
  183.             }  
  184.         }  
  185.     }  
  186.   
  187.     /** 
  188.      * 自定义Reducer函数 
  189.      *  
  190.      * @author 廖*民 time : 2015年1月21日下午8:41:01 
  191.      * @version 
  192.      */  
  193.     public static class SemiJoinReducer extends Reducer<Text, CombineEntity, Text, Text> {  
  194.   
  195.         // 存储一个分组中左表信息  
  196.         private List<Text> leftTable = new ArrayList<Text>();  
  197.         // 存储一个分组中右表数据  
  198.         private List<Text> rightTable = new ArrayList<Text>();  
  199.   
  200.         private Text secondPart = null;  
  201.         private Text outPut = new Text();  
  202.   
  203.         // 一个分组调用一次reduce()函数  
  204.         protected void reduce(Text key, Iterable<CombineEntity> values, Reducer<Text, CombineEntity, Text, Text>.Context context) throws IOException,  
  205.                 InterruptedException {  
  206.             // 清空分组数据  
  207.             leftTable.clear();  
  208.             rightTable.clear();  
  209.   
  210.             // 将不同文件的数据,分别放在不同的集合中;注意数据过大时,会出现OOM  
  211.             for (CombineEntity val : values) {  
  212.                 this.secondPart = new Text(val.getSecondPart().toString());  
  213.                 System.out.println("传到reduce中的secondPart部分:" + this.secondPart);  
  214.                   
  215.                 System.out.println("难道A表中就没有数据:" + val.getFlag().toString().trim().equals("0"));  
  216.                 // 左表  
  217.                 if (val.getFlag().toString().trim().equals("0")) {  
  218.                     leftTable.add(secondPart);  
  219.                 } else if (val.getFlag().toString().trim().equals("1")) {  
  220.                     rightTable.add(secondPart);  
  221.                 }  
  222.             }  
  223.             for (Text val : leftTable){  
  224.                 System.out.println("A 表中的数据为:" + val);  
  225.             }  
  226.             for (Text val : rightTable){  
  227.                 System.out.println("B 表中的数据为:" + val);  
  228.             }  
  229.             // 做笛卡尔积输出我们想要的连接数据  
  230.             for (Text left : leftTable) {  
  231.                 for (Text right : rightTable) {  
  232.                     outPut.set(left + "\t" + right);  
  233.                     // 将数据写出  
  234.                     context.write(key, outPut);  
  235.                 }  
  236.             }  
  237.   
  238.         }  
  239.     }  
  240. }  
  241.   
  242. /** 
  243.  * 自定义实体 
  244.  *  
  245.  * @author 廖*民 time : 2015年1月21日下午8:41:18 
  246.  * @version 
  247.  */  
  248. class CombineEntity implements WritableComparable<CombineEntity> {  
  249.   
  250.     // 连接key  
  251.     private Text joinKey;  
  252.     // 文件来源标志  
  253.     private Text flag;  
  254.     // 除了键外的其他部分的数据  
  255.     private Text secondPart;  
  256.   
  257.     // 无参构造函数  
  258.     public CombineEntity() {  
  259.         this.joinKey = new Text();  
  260.         this.flag = new Text();  
  261.         this.secondPart = new Text();  
  262.     }  
  263.   
  264.     // 有参构造函数  
  265.     public CombineEntity(Text joinKey, Text flag, Text secondPart) {  
  266.         this.joinKey = joinKey;  
  267.         this.flag = flag;  
  268.         this.secondPart = secondPart;  
  269.     }  
  270.   
  271.     public Text getJoinKey() {  
  272.         return joinKey;  
  273.     }  
  274.   
  275.     public void setJoinKey(Text joinKey) {  
  276.         this.joinKey = joinKey;  
  277.     }  
  278.   
  279.     public Text getFlag() {  
  280.         return flag;  
  281.     }  
  282.   
  283.     public void setFlag(Text flag) {  
  284.         this.flag = flag;  
  285.     }  
  286.   
  287.     public Text getSecondPart() {  
  288.         return secondPart;  
  289.     }  
  290.   
  291.     public void setSecondPart(Text secondPart) {  
  292.         this.secondPart = secondPart;  
  293.     }  
  294.   
  295.     public void write(DataOutput out) throws IOException {  
  296.         this.joinKey.write(out);  
  297.         this.flag.write(out);  
  298.         this.secondPart.write(out);  
  299.     }  
  300.   
  301.     public void readFields(DataInput in) throws IOException {  
  302.         this.joinKey.readFields(in);  
  303.         this.flag.readFields(in);  
  304.         this.secondPart.readFields(in);  
  305.     }  
  306.   
  307.     public int compareTo(CombineEntity o) {  
  308.         return this.joinKey.compareTo(o.joinKey);  
  309.     }  
  310.   
  311. }  


打成jar包,运行命令如下:

 

 

[java] view plain copy
 
  1. hadoop jar join.jar /semi_join/a.txt /semi_join/* /out  


注:a.txt是要加入到内存的表,/semi_join/*是要进入map()函数进行比对的目录,/out是输出目录。

 

 

程序运行的结果为:

 

技术分享

 

以上是关于MapReduce表连接之半连接SemiJoin的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce编程之Semi Join多种应用场景与使用

SQl 语句 表的连接

TCP之半关闭与CLOSE_WAIT

半连接和内连接的区别

深入浅出TCP之半关闭与CLOSE_WAIT

MySQL优化 — exists与in谁快谁慢?