第2节 mapreduce深入学习:15reduce端的join算法的实现

Posted mediocreworld

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第2节 mapreduce深入学习:15reduce端的join算法的实现相关的知识,希望对你有一定的参考价值。

reduce端的join算法:

例子:

商品表数据 product: 
pid
p0001,小米5,1000,2000
p0002,锤子T1,1000,3000

订单表数据 order: 
           pid
1001,20150710,p0001,2
1002,20150710,p0002,3
1002,20150710,p0003,3

mapReduce可以实现sql语句的功能:select 。。。。。。from product p left join order o on p.pid = o.pid

 

思路:将关联的条件作为map输出的key。

 

缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜。

替代解决方案: map端join实现方式。

 

代码:

ReduceJoinMain:
package cn.itcast.demo4.reduceJoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ReduceJoinMain extends Configured implements Tool
@Override
public int run(String[] args) throws Exception

Job job = Job.getInstance(this.getConf(), ReduceJoinMain.class.getSimpleName());
// job.setJarByClass(ReduceJoinMain.class);

job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\Study\\BigData\\heima\\stage2\\4、大数据离线第四天\\map端join\\input"));

job.setMapperClass(ReduceJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setReducerClass(ReduceJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\Study\\BigData\\heima\\stage2\\4、大数据离线第四天\\map端join\\reduce_join_output"));

boolean b = job.waitForCompletion(true);
return b?0:1;


public static void main(String[] args) throws Exception
int run = ToolRunner.run(new Configuration(), new ReduceJoinMain(), args);
System.exit(run);




ReduceJoinMapper:
package cn.itcast.demo4.reduceJoin;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ReduceJoinMapper extends Mapper<LongWritable,Text,Text,Text>
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
String line = value.toString();
String[] split = line.split(",");
if(line.startsWith("p"))
context.write(new Text(split[0]),value);
else
context.write(new Text(split[2]),value);




ReduceJoinReducer:
package cn.itcast.demo4.reduceJoin;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class ReduceJoinReducer extends Reducer<Text,Text,Text,Text>
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
String firP = "";
String secP = "";

for(Text text:values)
String value = text.toString();
if(value!=null && !"".equals(value))
if(value.startsWith("p"))
secP += value;
else
firP += value+"\t";



context.write(key,new Text(firP+secP));



 

以上是关于第2节 mapreduce深入学习:15reduce端的join算法的实现的主要内容,如果未能解决你的问题,请参考以下文章

第2节 mapreduce深入学习:8手机流量汇总求和

第2节 mapreduce深入学习:12reducetask运行机制(多看几遍)

深入MapReduce计算引擎02

深入大数据架构师之路,问鼎40万年薪

大数据学习深入源码解析MapReduce的架构及实现过程

第3节 mapreduce高级:4倒排索引的建立