MapReduce实现两表的Join--原理及python和java代码实现

Posted 默一鸣

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce实现两表的Join--原理及python和java代码实现相关的知识,希望对你有一定的参考价值。

用Hive一句话搞定的,但是有时必须要用mapreduce

方法介绍

 

1. 概述

在传统数据库(如:mysql)中,JOIN操作是非常常见且非常耗时的。而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧。
本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法。

 

2. 常见的join方法介绍

假设要进行join的数据分别来自File1和File2.

 

 

2.1 reduce side join

reduce side join是一种最简单的join方式,其主要思想如下:
在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。
在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。

 

2.2 map side join

之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。
Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。
为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。

 

2.3 SemiJoin

SemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO。
实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同。
更多关于半连接的介绍,可参考:半连接介绍:http://wenku.baidu.com/view/ae7442db7f1922791688e877.html

 

2.4 reduce side join + BloomFilter

在某些情况下,SemiJoin抽取出来的小表的key集合在内存中仍然存放不下,这时候可以使用BloomFiler以节省空间。
BloomFilter最常见的作用是:判断某个元素是否在一个集合里面。它最重要的两个方法是:add() 和contains()。最大的特点是不会存在false negative,即:如果contains()返回false,则该元素一定不在集合中,但会存在一定的true negative,即:如果contains()返回true,则该元素可能在集合中。
因而可将小表中的key保存到BloomFilter中,在map阶段过滤大表,可能有一些不在小表中的记录没有过滤掉(但是在小表中的记录一定不会过滤掉),这没关系,只不过增加了少量的网络IO而已。
更多关于BloomFilter的介绍,可参考:http://blog.csdn.net/jiaomeng/article/details/1495500

 

3. 二次排序

在Hadoop中,默认情况下是按照key进行排序,如果要按照value进行排序怎么办?即:对于同一个key,reduce函数接收到的value list是按照value排序的。这种应用需求在join操作中很常见,比如,希望相同的key中,小表对应的value排在前面。
有两种方法进行二次排序,分别为:buffer and in memory sort和 value-to-key conversion。
对于buffer and in memory sort,主要思想是:在reduce()函数中,将某个key对应的所有value保存下来,然后进行排序。 这种方法最大的缺点是:可能会造成out of memory。
对于value-to-key conversion,主要思想是:将key和部分value拼接成一个组合key(实现WritableComparable接口或者调用setSortComparatorClass函数),这样reduce获取的结果便是先按key排序,后按value排序的结果,需要注意的是,用户需要自己实现Paritioner,以便只按照key进行数据划分。Hadoop显式的支持二次排序,在Configuration类中有个setGroupingComparatorClass()方法,可用于设置排序group的key值,

 

 

 

 

reduce-side-join python代码

 

hadoop有个工具叫做steaming,能够支持python、shell、C++、php等其他任何支持标准输入stdin及标准输出stdout的语言,其运行原理可以通过和标准java的map-reduce程序对比来说明:

使用原生java语言实现Map-reduce程序

  1. hadoop准备好数据后,将数据传送给java的map程序
  2. java的map程序将数据处理后,输出O1
  3. hadoop将O1打散、排序,然后传给不同的reduce机器
  4. 每个reduce机器将传来的数据传给reduce程序
  5. reduce程序将数据处理,输出最终数据O2

借助hadoop streaming使用python语言实现Map-reduce程序

  1. hadoop准备好数据后,将数据传送给java的map程序
  2. java的map程序将数据处理成“键/值”对,并传送给python的map程序
  3. python的map程序将数据处理后,将结果传回给java的map程序
  4. java的map程序将数据输出为O1
  5. hadoop将O1打散、排序,然后传给不同的reduce机器
  6. 每个reduce机器将传来的数据处理成“键/值”对,并传送给python的reduce程序
  7. python的reduce程序将数据处理后,将结果返回给java的reduce程序
  8. java的reduce程序将数据处理,输出最终数据O2

上面红色表示map的对比,蓝色表示reduce的对比,可以看出streaming程序多了一步中间处理,这样说来steaming程序的效率和性能应该低于java版的程序,然而python的开发效率、运行性能有时候会大于java,这就是streaming的优势所在。

hadoop之实现集合join的需求

hadoop是用来做数据分析的,大都是对集合进行操作,因此该过程中将集合join起来使得一个集合能得到另一个集合对应的信息的需求非常常见。

比如以下这个需求,有两份数据:学生信息(学号,姓名)和学生成绩(学号、课程、成绩),特点是有个共同的主键“学号”,现在需要将两者结合起来得到数据(学号,姓名,课程,成绩),计算公式:

学号,姓名) join (学号,课程,成绩)= (学号,姓名,课程,成绩)

数据事例1-学生信息:

学号sno姓名name
01name1
02name2
03name3
04name4

数据事例2:-学生成绩:

学号sno课程号courseno成绩grade
010180
010290
020182
020295

期待的最终输出:

学号sno姓名name课程courseno成绩grade
01name10180
01name10290
02name20182
02name20295

实现join的注意点和易踩坑总结

如果你想写一个完善健壮的map reduce程序,我建议你首先弄清楚输入数据的格式、输出数据的格式,然后自己手动构建输入数据并手动计算出输出数据,这个过程中你会发现一些写程序中需要特别处理的地方:

  1. 实现join的key是哪个,是1个字段还是2个字段,本例中key是sno,1个字段
  2. 每个集合中key是否可以重复,本例中数据1不可重复,数据2的key可以重复
  3. 每个集合中key的对应值是否可以不存在,本例中有学生会没成绩,所以数据2的key可以为空

第1条会影响到hadoop启动脚本中key.fields和partition的配置,第2条会影响到map-reduce程序中具体的代码实现方式,第3条同样影响代码编写方式。

hadoop实现join操作的思路

具体思路是给每个数据源加上一个数字标记label,这样hadoop对其排序后同一个字段的数据排在一起并且按照label排好序了,于是直接将相邻相同key的数据合并在一起输出就得到了结果。

1、 map阶段:给表1和表2加标记,其实就是多输出一个字段,比如表一加标记为0,表2加标记为2;

2、 partion阶段:根据学号key为第一主键,标记label为第二主键进行排序和分区

3、 reduce阶段:由于已经按照第一主键、第二主键排好了序,将相邻相同key数据合并输出

hadoop使用python实现join的map和reduce代码

mapper.py的代码:

 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 # -*- coding: utf-8 -*- #Mapper.py import os import sys   #mapper脚本 def mapper ( ) : #获取当前正在处理的文件的名字,这里我们有两个输入文件 #所以要加以区分 filepath = os . environ [ "map_input_file" ] filename = os . path . split ( filepath ) [ - 1 ] for line in sys . stdin : if line . strip ( ) == "" : continue fields = line [ : - 1 ] . split ( "\\t" ) sno = fields [ 0 ] #以下判断filename的目的是不同的文件有不同的字段,并且需加上不同的标记 if filename == 'data_info' : name = fields [ 1 ] #下面的数字'0'就是为数据源1加上的统一标记 print '\\t' . join ( ( sno , '0' , name ) ) elif filename == 'data_grade' : courseno = fields [ 1 ] grade = fields [ 2 ] #下面的数字'1'就是为数据源1加上的统一标记 print '\\t' . join ( ( sno , '1' , courseno , grade ) )   if __name__ == '__main__' : mapper ( )

reducer的代码:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 # -*- coding: utf-8 -*- #reducer.py import sys   def reducer ( ) : #为了记录和上一个记录的区别,用lastsno记录上个sno lastsno = ""   for linein sys . stdin : if line . strip ( ) == "" : continue fields = line [ : - 1 ] . split ( "\\t" ) sno = fields [ 0 ] '' ' 处理思路: 遇见当前key与上一条key不同并且label=0,就记录下来name值, 当前key与上一条key相同并且label==1,则将本条数据的courseno、 grade联通上一条记录的name一起输出成最终结果 ' '' if sno != lastsno : name = "" #这里没有判断label==1的情况, #因为sno!=lastno,并且label=1表示该条key没有数据源1的数据 if fields [ 1 ] == "0" : name = fields [ 2 ] elif sno == lastno : #这里没有判断label==0的情况, #因为sno==lastno并且label==0表示该条key没有数据源2的数据 if fields [ 2 ] == "1" : courseno = fields [ 2 ] grade = fields [ 3 ] if name : print '\\t' . join ( ( lastsno , name , courseno , grade ) ) lastsno = sno   if __name__ == '__main__' : reducer ( )

使用shell脚本启动hadoop程序的方法:

 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #先删除输出目录 ~ / hadoop - client / hadoop / bin / hadoop fs - rmr / hdfs / jointest / output #注意,下面配置中的环境值每个人机器不一样 ~ / hadoop - client / hadoop / bin / hadoopstreaming \\ -D mapred . map . tasks = 10 \\ -D mapred . reduce . tasks = 5 \\ -D mapred . job . map . capacity = 10 \\ -D mapred . job . reduce . capacity = 5 \\ -D mapred . job . name = "join--sno_name-sno_courseno_grade" \\ -D num . key . fields .for . partition = 1 \\ -D stream . num . map . output . key . fields = 2 \\ - partitioner org . apache . hadoop . mapred . lib .KeyFieldBasedPartitioner \\ -input "/hdfs/jointest/input/*" \\ -output "/hdfs/jointest/output" \\ -mapper "python26/bin/python26.sh mapper.py" \\ -reducer "python26/bin/python26.sh reducer.py" \\ -file "mapper.py" \\ -file "reducer.py" \\ -cacheArchive "/share/python26.tar.gz#python26"   #看看运行成功没,若输出0则表示成功了 echo $ ?

可以自己手工构造输入输出数据进行测试,本程序是验证过的。

更多需要注意的地方

hadoop的join操作可以分为很多类型,各种类型脚本的编写有所不同,其分类是按照key字段数目、value字段数目、key是否可重复来划分的,以下是一个个人总结的对照表,表示会影响的地方:

影响类型影响的范围
key字段数目1、启动脚本中num.key.fields.for.partition的配置2、启动脚本中stream.num.map.output.key.fields的配置

3、map和reduce脚本中key的获取

4、map和reduce脚本中每一条数据和上一条数据比较的方法key是否可重复如果数据源1可重复,标记为M;数据源2可重复标记为N,那么join可以分为:1*1、M*1、M*N类型

1*1类型:reduce中先记录第一个value,然后在下一条直接合并输出;

M*1类型:将类型1作为标记小的输出,然后每次遇见label=1就记录value,每遇见一次label=2就输出一次最终结果;

M*N类型:遇见类型1,就用数组记录value值,遇见label=2就将将记录的数组值全部连同该行value输出。value字段数目影响每次label=1时记录的数据个数,需要将value都记录下来

 

reduce-side-join java代码

 

 

数据准备

首先是准备好数据。这个倒已经是一个熟练的过程,所要做的是把示例数据准备好,记住路径和字段分隔符。 准备好下面两张表: (1)m_ys_lab_jointest_a(以下简称表A) 建表语句为: [sql]  view plain  copy     print ?
  1. create table if not exists m_ys_lab_jointest_a (  
  2.      id bigint,  
  3.      name string  
  4. )  
  5. row format delimited  
  6. fields terminated by '9'  
  7. lines terminated by '10'  
  8. stored as textfile;  
数据:
id     name
1     北京
2     天津
3     河北
4     山西
5     内蒙古
6     辽宁
7     吉林
8     黑龙江
(2)m_ys_lab_jointest_b(以下简称表B) 建表语句为: [sql]  view plain  copy     print ?
  1. create table if not exists m_ys_lab_jointest_b (  
  2.      id bigint,  
  3.      statyear bigint,  
  4.      num bigint  
  5. )  
  6. row format delimited  
  7. fields terminated by '9'  
  8. lines terminated by '10'  
  9. stored as textfile;  
数据:
id     statyear     num
1     2010     1962
1     2011     2019
2     2010     1299
2     2011     1355
4     2010     3574
4     2011     3593
9     2010     2303
9     2011     2347

我们的目的是,以id为key做join操作,得到以下表: m_ys_lab_jointest_ab
id     name    statyear     num
1       北京    2011    2019
1       北京    2010    1962
2       天津    2011    1355
2       天津    2010    1299
4       山西    2011    3593
4       山西    2010    3574

计算模型

整个计算过程是: (1)在map阶段,把所有记录标记成<key, value>的形式,其中key是id,value则根据来源不同取不同的形式:来源于表A的记录,value的值为"a#"+name;来源于表B的记录,value的值为"b#"+score。 (2)在reduce阶段,先把每个key下的value列表拆分为分别来自表A和表B的两部分,分别放入两个向量中。然后遍历两个向量做笛卡尔积,形成一条条最终结果。 如下图所示:

代码

代码如下: [java]  view plain  copy     print ?
  1. import java.io.IOException;  
  2. import java.util.HashMap;  
  3. import java.util.Iterator;  
  4. import java.util.Vector;  
  5.   
  6. import org.apache.hadoop.io.LongWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.io.Writable;  
  9. import org.apache.hadoop.mapred.FileSplit;  
  10. import org.apache.hadoop.mapred.JobConf;  
  11. import org.apache.hadoop.mapred.MapReduceBase;  
  12. import org.apache.hadoop.mapred.Mapper;  
  13. import org.apache.hadoop.mapred.OutputCollector;  
  14. import org.apache.hadoop.mapred.RecordWriter;  
  15. import org.apache.hadoop.mapred.Reducer;  
  16. import org.apache.hadoop.mapred.Reporter;  
  17.   
  18. /** 
  19.  * MapReduce实现Join操作 
  20.  */  
  21. public class MapRedJoin   
  22.     public static final String DELIMITER = "\\u0009"// 字段分隔符  
  23.       
  24.     // map过程  
  25.     public static class MapClass extends MapReduceBase implements  
  26.             Mapper<LongWritable, Text, Text, Text>   
  27.                           
  28.         public void configure(JobConf job)   
  29.             super.configure(job);  
  30.           
  31.           
  32.         public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,  
  33.                 Reporter reporter) throws IOException, ClassCastException   
  34.             // 获取输入文件的全路径和名称  
  35.             String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();  
  36.             // 获取记录字符串  
  37.             String line = value.toString();  
  38.             // 抛弃空记录  
  39.             if (line == null || line.equals("")) return;   
  40.               
  41.             // 处理来自表A的记录  
  42.             if (filePath.contains("m_ys_lab_jointest_a"))   
  43.                 String[] values = line.split(DELIMITER); // 按分隔符分割出字段  
  44.                 if (values.length < 2return;  
  45.                   
  46.                 String id = values[0]; // id  
  47.                 String name = values[1]; // name  
  48.                   
  49.                 output.collect(new Text(id), new Text("a#"+name));  
  50.               
  51.             // 处理来自表B的记录  
  52.             else if (filePath.contains("m_ys_lab_jointest_b"))   
  53.                 String[] values = line.split(DELIMITER); // 按分隔符分割出字段  
  54.                 if (values.length < 3return;  
  55.                   
  56.                 String id = values[0]; // id  
  57.                 String statyear = values[1]; // statyear  
  58.                 String num = values[2]; //num  
  59.                   
  60.                 output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));  
  61.               
  62.           
  63.       
  64.       
  65.     // reduce过程  
  66.     public static class Reduce extends MapReduceBase  
  67.             implements Reducer<Text, Text, Text, Text>   
  68.         public void reduce(Text key, Iterator<Text> values,  
  69.                 OutputCollector<Text, Text> output, Reporter reporter)  
  70.                 throws IOException   
  71.                       
  72.             Vector<String> vecA = new Vector<String>(); // 存放来自表A的值  
  73.             Vector<String> vecB = new Vector<String>(); // 存放来自表B的值  
  74.               
  75.             while (values.hasNext())   
  76.                 String value = values.next().toString();  
  77.                 if (value.startsWith("a#"))   
  78.                     vecA.add(value.substring(2));  
  79.                  else if (value.startsWith("b#"))   
  80.                     vecB.add(value.substring(2));  
  81.                   
  82.               
  83.               
  84.             int sizeA = vecA.size();  
  85.             int sizeB = vecB.size();  
  86.               
  87.             // 遍历两个向量  
  88.             int i, j;  
  89.             for (i = 0; i < sizeA; i ++)   
  90.                 for (j = 0; j < sizeB; j ++)   
  91.                     output.collect(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j)));  
  92.                   
  93.                  
  94.           
  95.       
  96.       
  97.     protected void configJob(JobConf conf)   
  98.         conf.setMapOutputKeyClass(Text.class);  
  99.         conf.setMapOutputValueClass(Text.class);  
  100.         conf.setOutputKeyClass(Text.class);  
  101.         conf.setOutputValueClass(Text.class);  
  102.         conf.setOutputFormat(ReportOutFormat.class);  
  103.       
  104.   

技术细节

下面说一下其中的若干技术细节: (1)由于输入数据涉及两张表,我们需要判断当前处理的记录是来自表A还是来自表B。Reporter类getInputSplit()方法可以获取输入数据的路径,具体代码如下: String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString(); (2)map的输出的结果,同id的所有记录(不管来自表A还是表B)都在同一个key下保存在同一个列表中,在reduce阶段需要将其拆开,保存为相当于笛卡尔积的m x n条记录。由于事先不知道m、n是多少,这里使用了两个向量(可增长数组)来分别保存来自表A和表B的记录,再用一个两层嵌套循环组织出我们需要的最终结果。 (3)在MapReduce中可以使用System.out.println()方法输出,以方便调试。不过System.out.println()的内容不会在终端显示,而是输出到了stdout和stderr这两个文件中,这两个文件位于logs/userlogs/attempt_xxx目录下。可以通过web端的历史job查看中的“Analyse This Job”来查看stdout和stderr的内容。

 

 

 

所有方法的java代码(巨长)

 

从别人那转来

 

1、在Reudce端进行连接。 在Reudce端进行连接是MapReduce框架进行表之间join操作最为常见的模式,其具体的实现原理如下: Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。 reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。原理非常简单,下面来看一个实例: (1)自定义一个value返回类型:
  1. package com.mr.reduceSizeJoin;   
  2. import java.io.DataInput;   
  3. import java.io.DataOutput;   
  4. import java.io.IOException;   
  5. import org.apache.hadoop.io.Text;   
  6. import org.apache.hadoop.io.WritableComparable;   
  7. public class CombineValues implements WritableComparable   
  8.     //private stat

    以上是关于MapReduce实现两表的Join--原理及python和java代码实现的主要内容,如果未能解决你的问题,请参考以下文章

    MapReduce实现两表的Join--原理及python和java代码实现

    MapReduce实现两表join

    如何使用mapreduce实现两表join

    多表查询-inner join left join right joinfull join

    两表联查 条件的问题

    SQL——左连接(Left join)右连接(Right join)内连接(Inner join)