第2节 mapreduce深入学习:15reduce端的join算法的实现
Posted mediocreworld
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第2节 mapreduce深入学习:15reduce端的join算法的实现相关的知识,希望对你有一定的参考价值。
reduce端的join算法:
例子:
商品表数据 product:
pid
p0001,小米5,1000,2000
p0002,锤子T1,1000,3000
订单表数据 order:
pid
1001,20150710,p0001,2
1002,20150710,p0002,3
1002,20150710,p0003,3
mapReduce可以实现sql语句的功能:select 。。。。。。from product p left join order o on p.pid = o.pid
思路:将关联的条件作为map输出的key。
缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜。
替代解决方案: map端join实现方式。
代码:
ReduceJoinMain:
package cn.itcast.demo4.reduceJoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ReduceJoinMain extends Configured implements Tool
@Override
public int run(String[] args) throws Exception
Job job = Job.getInstance(this.getConf(), ReduceJoinMain.class.getSimpleName());
// job.setJarByClass(ReduceJoinMain.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\Study\\BigData\\heima\\stage2\\4、大数据离线第四天\\map端join\\input"));
job.setMapperClass(ReduceJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(ReduceJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\Study\\BigData\\heima\\stage2\\4、大数据离线第四天\\map端join\\reduce_join_output"));
boolean b = job.waitForCompletion(true);
return b?0:1;
public static void main(String[] args) throws Exception
int run = ToolRunner.run(new Configuration(), new ReduceJoinMain(), args);
System.exit(run);
ReduceJoinMapper:
package cn.itcast.demo4.reduceJoin;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ReduceJoinMapper extends Mapper<LongWritable,Text,Text,Text>
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
String line = value.toString();
String[] split = line.split(",");
if(line.startsWith("p"))
context.write(new Text(split[0]),value);
else
context.write(new Text(split[2]),value);
ReduceJoinReducer:
package cn.itcast.demo4.reduceJoin;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class ReduceJoinReducer extends Reducer<Text,Text,Text,Text>
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
String firP = "";
String secP = "";
for(Text text:values)
String value = text.toString();
if(value!=null && !"".equals(value))
if(value.startsWith("p"))
secP += value;
else
firP += value+"\t";
context.write(key,new Text(firP+secP));
以上是关于第2节 mapreduce深入学习:15reduce端的join算法的实现的主要内容,如果未能解决你的问题,请参考以下文章