Mapjoin和Reducejoin案例
Posted hellobigtable
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Mapjoin和Reducejoin案例相关的知识,希望对你有一定的参考价值。
一、Mapjoin案例
1.需求:有两个文件,分别是订单表、商品表,
订单表有三个属性分别为订单时间、商品id、订单id(表示内容量大的表),
商品表有两个属性分别为商品id、商品名称(表示内容量小的表,用于加载到内存),
要求结果文件为在订单表中的每一行最后添加商品id对应的商品名称。
2.解决思路:
将商品表加载到内存中,然后再map方法中将订单表中的商品id对应的商品名称添加到该行的最后,不需要Reducer,并在Driver执行类中设置setCacheFile和numReduceTask。
3.代码如下:
public class CacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ HashMap<String, String> pdMap = new HashMap<>(); //1.商品表加载到内存 protected void setup(Context context) throws IOException { //加载缓存文件 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"), "Utf-8")); String line; while(StringUtils.isNotEmpty(line = br.readLine()) ) { //切分 String[] fields = line.split(" "); //缓存 pdMap.put(fields[0], fields[1]); } br.close(); } //2.map传输 @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { //获取数据 String line = value.toString(); //切割 String[] fields = line.split(" "); //获取订单中商品id String pid = fields[1]; //根据订单商品id获取商品名 String pName = pdMap.get(pid); //拼接数据 line = line + " " + pName; //输出 context.write(new Text(line), NullWritable.get()); } } public class CacheDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { // 1.获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2.获取jar包 job.setJarByClass(CacheDriver.class); // 3.获取自定义的mapper与reducer类 job.setMapperClass(CacheMapper.class); // 5.设置reduce输出的数据类型(最终的数据类型) job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 6.设置输入存在的路径与处理后的结果路径 FileInputFormat.setInputPaths(job, new Path("c://table1029//in")); FileOutputFormat.setOutputPath(job, new Path("c://table1029//out")); //加载缓存商品数据 job.addCacheFile(new URI("file:///c:/inputcache/pd.txt")); //设置一下reducetask的数量 job.setNumReduceTasks(0); // 7.提交任务 boolean rs = job.waitForCompletion(true); System.out.println(rs ? 0 : 1); } }
二、Reducejoin案例
1.需求:同上的两个数据文件,要求将订单表中的商品id替换成对应的商品名称。
2.解决思路:封装TableBean类,包含属性:时间、商品id、订单id、商品名称、flag(flag用来判断是哪张表),
使用Mapper读两张表,通过context对象获取切片对象,然后通过切片获取切片名称和路径的字符串来判断是哪张表,再将切片的数据封装到TableBean对象,最后以产品id为key、TableBean对象为value传输到Reducer端;
Reducer接收数据后通过flag判断是哪张表,因为一个reduce中的所有数据的key是相同的,将商品表的商品id和商品名称读入到一个TableBean对象中,然后将订单表的中的数据读入到TableBean类型的ArrayList对象中,然后将ArrayList中的每个TableBean的商品id替换为商品名称,然后遍历该数组以TableBean为key输出。
3.代码如下:
/** * @author: PrincessHug * @date: 2019/3/30, 2:37 * @Blog: https://www.cnblogs.com/HelloBigTable/ */ public class TableBean implements Writable { private String timeStamp; private String productId; private String orderId; private String productName; private String flag; public TableBean() { } public String getTimeStamp() { return timeStamp; } public void setTimeStamp(String timeStamp) { this.timeStamp = timeStamp; } public String getProductId() { return productId; } public void setProductId(String productId) { this.productId = productId; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getProductName() { return productName; } public void setProductName(String productName) { this.productName = productName; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(timeStamp); out.writeUTF(productId); out.writeUTF(orderId); out.writeUTF(productName); out.writeUTF(flag); } @Override public void readFields(DataInput in) throws IOException { timeStamp = in.readUTF(); productId = in.readUTF(); orderId = in.readUTF(); productName = in.readUTF(); flag = in.readUTF(); } @Override public String toString() { return timeStamp + " " + productName + " " + orderId; } } public class TableMapper extends Mapper<LongWritable, Text,Text,TableBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //通过切片获取文件信息 FileSplit split = (FileSplit) context.getInputSplit(); String name = split.getPath().getName(); //获取一行数据、定义TableBean对象 String line = value.toString(); TableBean tb = new TableBean(); Text t = new Text(); //判断是哪一张表 if (name.contains("order.txt")){ String[] fields = line.split(" "); tb.setTimeStamp(fields[0]); tb.setProductId(fields[1]); tb.setOrderId(fields[2]); tb.setProductName(""); tb.setFlag("0"); t.set(fields[1]); }else { String[] fields = line.split(" "); tb.setTimeStamp(""); tb.setProductId(fields[0]); tb.setOrderId(""); tb.setProductName(fields[1]); tb.setFlag("1"); t.set(fields[0]); } context.write(t,tb); } } public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> { @Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException { //分别创建用来存储订单表和产品表的集合 ArrayList<TableBean> orderBean = new ArrayList<>(); TableBean productBean = new TableBean(); //遍历values,通过flag判断是产品表还是订单表 for (TableBean v:values){ if (v.getFlag().equals("0")){ TableBean tableBean = new TableBean(); try { BeanUtils.copyProperties(tableBean,v); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } orderBean.add(tableBean); }else { try { BeanUtils.copyProperties(productBean,v); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } //拼接表 for (TableBean ob:orderBean) { ob.setProductName(productBean.getProductName()); context.write(ob,NullWritable.get()); } } } public class TableDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //jar包 job.setJarByClass(TableDriver.class); //Mapper、Reducer job.setMapperClass(TableMapper.class); job.setReducerClass(TableReducer.class); //Mapper输出数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TableBean.class); //Reducer输出数据类型 job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); //输入输出路径 FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\reducejoin\\in")); FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\reducejoin\\out")); //提交任务 if (job.waitForCompletion(true)){ System.out.println("运行完成!"); }else { System.out.println("运行失败!"); } } }
以上是关于Mapjoin和Reducejoin案例的主要内容,如果未能解决你的问题,请参考以下文章