mapreduce的join
Posted Amelie.tingting
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mapreduce的join相关的知识,希望对你有一定的参考价值。
一篇超级详细的文章:this one
读完之后感触颇深,什么时候在map的时候join什么时候在reducer的时候join
之前写两个输入的时候,写的多么可笑,效率极低。
先用了一遍这篇文章的分布式缓存,在reduce的时候读取,因为hadoop版本太低,所以又做了修改,结合好几篇文章结果:
版本:Hadoop0.20.203.0
package bjut.edu.ting; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Hashtable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.opencsv.CSVParser; //有两个输入:GPS(数据量大),Dictionary(数据量小);
//两者通过属性bus_line在reduce过程中连接,将dictonary放在内存,读取之时用hashtable,存储检索
//其中mapper过程通过passtime计算date,并赋值给GPS数据
public class DateLineJob{ public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{ //处理GPS数据 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ CSVParser parser = new CSVParser(); String[] gpsData = parser.parseLine(value.toString()); Integer date_label=null; try { date_label=getDateStamp(gpsData[2]); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } if(date_label==12&&date_label!=-1){//这儿的date_label需要修改 String outValue=date_label.toString()+","+gpsData[0]+","+gpsData[2]+","+gpsData[3]+","+gpsData[4]; //key:bus_line value:0:date,1:vehicle,2:pass,3:lon,4:lat context.write(new Text(gpsData[1]),new Text(outValue)); } } } public static class JoinReducer extends Reducer<Text, Text, NullWritable, Text>{ //定义HashTable存放缓存数据 private Hashtable <String,String> table=new Hashtable<String,String>(); /** * 获取分布式缓存文件 */ private Path[] modelPath; private BufferedReader modelBR; protected void setup(Context context) throws IOException {
//返回本地文件路径 Configuration conf = context.getConfiguration(); modelPath = DistributedCache.getLocalCacheFiles(conf); if(modelPath.length==0){ throw new FileNotFoundException("Distributed cache file not found"); } modelBR = new BufferedReader(new FileReader(modelPath[0].toString())); //按行读取并解析字典数据, String infoDic=null; while((infoDic=modelBR.readLine())!=null){ String[] records=infoDic.split(","); //key为bus_line value为line_code table.put(records[1],records[0]);//将相应的字段存入Hashtable里面 } modelBR.close(); } public void reduce(Text key,Iterable<Text> values, Context context) throws IOException, InterruptedException{ //字典数据根据bus_line获取line_code String line_code=table.get(key.toString());//从Hashtable中获取line_code if(line_code!=null){//有些线路在字典中没有 for(Text value:values){ String outValue=value.toString(); String[] valueData=outValue.split(","); //0:date,1:vehicle,2:pass,3:lon,4:lat String out=valueData[0]+","+line_code+","+valueData[1]+","+valueData[2]+","+valueData[3]+","+valueData[4]; context.write(null, new Text(out)); } } } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); conf.set("mapreduce.admin.reduce.child.java.opts", "-Xmx512m"); DistributedCache.addCacheFile(new Path("hdfs://172.18.49.17:8020/Anewday/line_route_dict_update08_nohead.csv").toUri(), conf); DistributedCache.createSymlink(conf); Job job = new Job(conf,"join"); job.setJarByClass(DateLineJob.class); //设置GPS作为输入 FileInputFormat.addInputPath(job,new Path(args[0])); //输出目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(JoinMapper.class); job.setReducerClass(JoinReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true)?0:1); } private static int getDateStamp(String timeStr) throws ParseException{ if(timeStr.length()==19){//若不是这个形式,则不考虑 SimpleDateFormat formatter=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date data=formatter.parse(timeStr); int dateStamp=-1; dateStamp=data.getDate(); //这个地方会出现日期不存在的情况错误提示,但不影响运行,而且这样的数据量特别少。 return dateStamp; }else{//返回-1 return -1; } } }
以上是关于mapreduce的join的主要内容,如果未能解决你的问题,请参考以下文章
MapReduce实现两表的Join--原理及python和java代码实现
MapReduce实现两表的Join--原理及python和java代码实现
Hadoop MapReduce编程 API入门系列之join(二十五)(未完)