MapJoin的原理及案例
Posted 月疯
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapJoin的原理及案例相关的知识,希望对你有一定的参考价值。
mapJoin原理:适用于大表join小表,使用DistributedCache机制将小表存储到各个Mapper进程所在机器的磁盘空间上,各个Mapper进程读取不同的大表分片,将分片中的每一条记录与小表中所有记录进行合并
合并后直接输出map结果即可得到最终结果。注:不需要进行shuffle流程,也不需要reduce处理
案列:
detail.txt
order_id item_id amout
12 sp001 2
12 sp002 4
12 sp003 3
13 sp001 2
13 sp002 4
iteminfo.txt
item_id item_type
sp001 type001
sp002 type002
sp003 type002
package squencefile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public class MapJoin
public static class MyMapper extends Mapper<LongWritable,Text,Text,Text>
private Map<String,String> iteminfoMap = new HashMap<>();
/**
*将小表中记录加载到mapper进程机器内存中
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException
super.setup(context);
//1、读磁盘空间上的对应小表(废弃)
// URI[] uri=DistributedCache.getCacheFiles(context.getConfiguration());
URI[] paths=context.getCacheFiles();
for(URI uri:paths)
String pathName = uri.toString();
//判断是否是iteminfo小表
if(!pathName.endsWith("iteminfo.txt")) return;
//通过输入流读取磁盘上的文件
BufferedReader reader=new BufferedReader(new FileReader(pathName));
String str = null;
while((str = reader.readLine())!=null)
String[] itemInfoArr=str.split("\\t");
if(itemInfoArr.length==2)
iteminfoMap.put(itemInfoArr[0],itemInfoArr[1]);
/**
*通过读取大表中的每条记录
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
//判断是否是大表数据,需要获取到输入分片的文件名,和大表的文件名进行对比
String fileName = ((FileSplit)context.getInputSplit()).getPath().getName();
if(fileName.endsWith("detail.txt"))
//将分片中的每一条记录与小表中所有记录进行合并
String detail=value.toString();
String[] detailArr = detail.split("\\t");
if(detailArr.length != 3) return;
String itemType=iteminfoMap.get(detailArr[1]);
if(itemType == null) return;
System.out.print(detailArr);
//输出格式:<item_id,item_type+"\\t"+order_id+"\\t"+amount>
//拼接
StringBuffer sb=new StringBuffer();
sb.append(itemType).append("\\t").append(detailArr[0]).append("\\t").append(detailArr[2]);
//输出数据
context.write(new Text(detailArr[1]),new Text(sb.toString()));
//
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
//创建一个job,也就是一个运行环境
Configuration conf = new Configuration();
//判断输出目录是否存在,如果存在就删除
FileSystem fs=FileSystem.get(conf);
if(fs.exists(new Path("F:\\\\filnk_package\\\\hadoop-2.10.1\\\\data\\\\test6\\\\out")))
fs.delete(new Path("F:\\\\filnk_package\\\\hadoop-2.10.1\\\\data\\\\test6\\\\out"),true);
//将小表加载到各个Mapper进程所在的机器的磁盘上(废弃掉了)
// DistributedCache.addCacheFile(new Path("").toUri(),conf);
//本地运行
Job job=Job.getInstance(conf,"MapJoin");
//程序入口(打jar包)
job.setJarByClass(MapJoin.class);
//需要输入个文件:输入文件
FileInputFormat.addInputPath(job,new Path("F:\\\\filnk_package\\\\hadoop-2.10.1\\\\data\\\\test6\\\\detail.txt"));
//将小表加载到各个Mapper进程所在机器的磁盘上
job.addCacheFile(new Path("F:\\\\filnk_package\\\\hadoop-2.10.1\\\\data\\\\test6\\\\iteminfo.txt").toUri());
//编写mapper处理逻辑
job.setMapperClass(MapJoin.MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//输出文件
FileOutputFormat.setOutputPath(job,new Path("F:\\\\filnk_package\\\\hadoop-2.10.1\\\\data\\\\test6\\\\out"));
//运行job,需要放到Yarn上运行
boolean result =job.waitForCompletion(true);
System.out.print(result?1:0);
以上是关于MapJoin的原理及案例的主要内容,如果未能解决你的问题,请参考以下文章