MapJoin的原理及案例

Posted 月疯

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapJoin的原理及案例相关的知识,希望对你有一定的参考价值。

mapJoin原理:适用于大表join小表,使用DistributedCache机制将小表存储到各个Mapper进程所在机器的磁盘空间上,各个Mapper进程读取不同的大表分片,将分片中的每一条记录与小表中所有记录进行合并
合并后直接输出map结果即可得到最终结果。

注:不需要进行shuffle流程,也不需要reduce处理

案列: 

detail.txt

order_id    item_id    amout
12    sp001    2
12    sp002    4
12    sp003    3
13    sp001    2
13    sp002    4

iteminfo.txt

item_id    item_type
sp001    type001
sp002    type002
sp003    type002 

package squencefile;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

public class MapJoin 

    public static class MyMapper extends Mapper<LongWritable,Text,Text,Text>
        private Map<String,String> iteminfoMap = new HashMap<>();
        /**
         *将小表中记录加载到mapper进程机器内存中
         */
        @Override
        protected void setup(Context context) throws IOException, InterruptedException 
            super.setup(context);
            //1、读磁盘空间上的对应小表(废弃)
//            URI[] uri=DistributedCache.getCacheFiles(context.getConfiguration());
            URI[] paths=context.getCacheFiles();
            for(URI uri:paths)
                String pathName = uri.toString();
                //判断是否是iteminfo小表
                if(!pathName.endsWith("iteminfo.txt")) return;
                //通过输入流读取磁盘上的文件
                BufferedReader reader=new BufferedReader(new FileReader(pathName));
                String str = null;
                while((str = reader.readLine())!=null)
                    String[] itemInfoArr=str.split("\\t");

                    if(itemInfoArr.length==2)
                        iteminfoMap.put(itemInfoArr[0],itemInfoArr[1]);
                    
                
            
        
        /**
         *通过读取大表中的每条记录
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
            //判断是否是大表数据,需要获取到输入分片的文件名,和大表的文件名进行对比
            String fileName = ((FileSplit)context.getInputSplit()).getPath().getName();
            if(fileName.endsWith("detail.txt"))
                //将分片中的每一条记录与小表中所有记录进行合并
                String detail=value.toString();
                String[] detailArr = detail.split("\\t");
                if(detailArr.length != 3) return;
                String itemType=iteminfoMap.get(detailArr[1]);

                if(itemType == null) return;
                System.out.print(detailArr);
                //输出格式:<item_id,item_type+"\\t"+order_id+"\\t"+amount>
                //拼接
                StringBuffer sb=new StringBuffer();
                sb.append(itemType).append("\\t").append(detailArr[0]).append("\\t").append(detailArr[2]);
                //输出数据
                context.write(new Text(detailArr[1]),new Text(sb.toString()));
            
        
    
    //
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
        //创建一个job,也就是一个运行环境
        Configuration conf = new Configuration();
        //判断输出目录是否存在,如果存在就删除
        FileSystem fs=FileSystem.get(conf);
        if(fs.exists(new Path("F:\\\\filnk_package\\\\hadoop-2.10.1\\\\data\\\\test6\\\\out")))
            fs.delete(new Path("F:\\\\filnk_package\\\\hadoop-2.10.1\\\\data\\\\test6\\\\out"),true);
        
        //将小表加载到各个Mapper进程所在的机器的磁盘上(废弃掉了)
//        DistributedCache.addCacheFile(new Path("").toUri(),conf);

        //本地运行
        Job job=Job.getInstance(conf,"MapJoin");
        //程序入口(打jar包)
        job.setJarByClass(MapJoin.class);

        //需要输入个文件:输入文件
        FileInputFormat.addInputPath(job,new Path("F:\\\\filnk_package\\\\hadoop-2.10.1\\\\data\\\\test6\\\\detail.txt"));

        //将小表加载到各个Mapper进程所在机器的磁盘上
        job.addCacheFile(new Path("F:\\\\filnk_package\\\\hadoop-2.10.1\\\\data\\\\test6\\\\iteminfo.txt").toUri());
        //编写mapper处理逻辑
        job.setMapperClass(MapJoin.MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //输出文件
        FileOutputFormat.setOutputPath(job,new Path("F:\\\\filnk_package\\\\hadoop-2.10.1\\\\data\\\\test6\\\\out"));

        //运行job,需要放到Yarn上运行
        boolean result =job.waitForCompletion(true);
        System.out.print(result?1:0);
    

 

以上是关于MapJoin的原理及案例的主要内容,如果未能解决你的问题,请参考以下文章

怎么知道任务开始mapjoin

MapReduce算法形式四:mapjoin

HiveSpark优化案例

Hive Map Join

spring入门案例分析及原理

BASE64编码原理分析脚本实现及逆向案例