MapReduce之Map Join

Posted perfectdata

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce之Map Join相关的知识,希望对你有一定的参考价值。

一 介绍

之所以存在Reduce Join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。

Map Join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。

为了支持文件的共享,Hadoop用到了分布式缓存的概念,在MapReduce中称为DistributedCache(目前已被标注为弃用,分布式缓存的API可在Job类本身调用),它可以方便Map Task之间或Reduce Task之间共享一些信息,同时也可以将第三方Jar包添加到其Classpass路径中。Hadoop会将缓存数据分发到集群中所有准备启动的节点上,复制到mapreduce.temp.dir中的配置目录。

使用该类的方法如下:

job.addArchiveToClassPath(archive); //缓存jar包到task运行节点的classpath中
ob.addCacheArchive(uri); //缓存压缩包到task运行节点的工作目录
job.addFileToClassPath(file); //缓存普通文件到task运行节点的classpath中
job.addCacheFile(url); //将产品表文件缓存到task工作节点的工作目录中去

传参格式:hdfs://namenode:9000/home/XXX/file,即Jar包、压缩包、普通文件所在hdfs路径。

同时DistributedCache(分布式缓存)可用来解决join算法实现中的数据倾斜问题,例如两张表:订单表和产品表。

订单表:

订单号 时间 商品id 购买数量 
1001,20170710,P0001,1 
1002,20170710,P0001,3 
1003,20170710,P0002,3 
1004,20170710,P0002,4

产品表: 

商品id 商品名称 
P0001,xiaomi
P0002,huawei

需求就是根据外键商品id来将两张表信息合并,拼接成 :

1001 ,20170710,P0001,1 xiaomi
1002,20170710,P0001,3 xiaomi
1003,20170710,P0002,3,huawei
1004,20170710,P0002,4,huawei

考虑问题:在mapreduce程序中,如果某些产品非常畅销,肯定会产生很多订单,但是刚好这些订单信息都传到了一个reduce中(分区默认就是使用hashcode%reducetask数量,所以这种情况是正常的)。那么这个reducetask压力就很大了,而其他的reducetask处理的信息就很小,有的甚至就处理几条数据,这就出现了数据倾斜问题。

解决方案:一般来说订单表的数据远远多于产品表数据,毕竟产品的种类就那些,所以我们可以把产品信息都交给Map Task就行了逻辑都让Map Task来处理,也就是说不使用Reduce了,而让每个Map Task持有个product.data(存储产品信息的文件)即可。那么maptask怎么获得这个文件呢?刚好hadoop提供了DistributedCache,我们将文件交给这个分布式缓存,它会将我们的文件放到Map Task的工作目录中,那么Map 端可以直接从工作目录中去拿。

二 代码部分

  1 package mapreduce.DistributedCache;
  2 
  3 import java.io.BufferedReader;
  4 import java.io.FileInputStream;
  5 import java.io.IOException;
  6 import java.io.InputStream;
  7 import java.io.InputStreamReader;
  8 import java.net.URI;
  9 import java.util.HashMap;
 10 import java.util.Map;
 11 import org.apache.hadoop.conf.Configuration;
 12 import org.apache.hadoop.conf.Configured;
 13 import org.apache.hadoop.fs.Path;
 14 import org.apache.hadoop.io.LongWritable;
 15 import org.apache.hadoop.io.NullWritable;
 16 import org.apache.hadoop.io.Text;
 17 import org.apache.hadoop.mapreduce.Job;
 18 import org.apache.hadoop.mapreduce.Mapper;
 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 20 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 21 import org.apache.hadoop.util.Tool;
 22 import org.apache.hadoop.util.ToolRunner;
 23 
 24 public class MapJoin extends Configured implements Tool{
 25     static class MapJoinMapper extends Mapper<LongWritable, Text, NullWritable, Text>{
 26         //用来缓存小文件(商品文件中的数据)
 27         Map<String, String> produceMap = new HashMap<String,String>();
 28         Text k = new Text();
 29         /*
 30          * 源码中能看到在循环执行map()之前会执行一次setUp方法,可以用来做初始化
 31          */
 32         @Override
 33         protected void setup(Context context)
 34                 throws IOException, InterruptedException {
 35 
 36             //将商品文件中的数据写到缓存中  
 37             FileInputStream fileInput = new FileInputStream("product.data");
 38             //read data
 39             InputStreamReader readFile = new InputStreamReader(fileInput );
 40             BufferedReader br = new BufferedReader(readFile);
 41             String line = null;
 42             while((line=br.readLine())!=null){
 43                 //一行数据格式为P0001,xiaomi(商品id,商品名称)
 44                 String[] fields = line.split(",");
 45                 produceMap.put(fields[0], fields[1]);
 46             }
 47         }
 48         @Override
 49         protected void map(LongWritable key, Text value, Context context)
 50                 throws IOException, InterruptedException {
 51             //一行订单数据    格式为 1001,20170710,P0001,1(订单id,创建时间,商品id,购买商品数量)
 52             String line = value.toString();
 53             String[] fields = line.split(",");
 54             //根据订单数据中商品id在缓存中找出来对应商品信息(商品名称),进行串接
 55             String productName = produceMap.get(fields[2]);
 56             k.set(line+","+productName);
 57             context.write(NullWritable.get(), k );
 58         }
 59     }
 60     
 61     public int run(String[] args) throws Exception {
 62 
 63     // step 1:get configuration
 64     Configuration conf = this.getConf();
 65         //set job
 66     Job job = Job.getInstance(conf);
 67         job.setJarByClass(MapJoin.class);
 68 
 69         job.setMapperClass(MapJoinMapper.class);
 70         job.setMapOutputKeyClass(Text.class);
 71         job.setMapOutputValueClass(NullWritable.class);
 72 
 73         //设置最终输出类型
 74         job.setOutputKeyClass(Text.class);
 75         job.setOutputValueClass(NullWritable.class);
 76 
 77         //将产品表文件缓存到task工作节点的工作目录中去
 78         //缓存普通文件到task运行节点的工作目录(hadoop帮我们完成)
 79         job.addCacheFile(new URI("hdfs://beifeng01:8020/user/beifeng01/mapreduce/input/mapjoin/product.data"));
 80 
 81         //不需要reduce,那么也就没有了shuffle过程
 82         job.setNumReduceTasks(0);
 83 
 84         FileInputFormat.setInputPaths(job, new Path(args[0]));
 85         FileOutputFormat.setOutputPath(job, new Path(args[1]));
 86 
 87         boolean isSuccess = job.waitForCompletion(true);
 88         
 89         return isSuccess ? 0 : 1;
 90     }
 91     
 92     public static void main(String[] args) throws Exception {
 93         args = new String[]{
 94                 "hdfs://beifeng01:8020/user/beifeng01/mapreduce/input/mapjoin/orderid.data",
 95                 "hdfs://beifeng01:8020/user/beifeng01/mapreduce/output4"
 96         };
 97         
 98         Configuration conf = new Configuration();
 99         
100         // run mapreduce
101         int status = ToolRunner.run(conf, new MapJoin(), args);
102         
103         // exit program
104         System.exit(status);
105     }
106 }

运行代码后查看输出结果

[[email protected] hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -text /user/beifeng01/mapreduce/output4/p*
1001,20170710,P0001,1,xiaomi 
1002,20170710,P0001,3,xiaomi 
1003,20170710,P0002,3,huawei
1004,20170710,P0002,4,huawei

  

以上是关于MapReduce之Map Join的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce表连接操作之Reduce端join

MapReduce Join的使用

Hive Join 优化之 Map Join

mapreduce join操作

MapReduce编程之Semi Join多种应用场景与使用

MapReduce-join连接