MapReduce-join连接
Posted enzodin
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce-join连接相关的知识,希望对你有一定的参考价值。
join连接
MapReduce能够执行大型数据集间的连接(join)操作。连接操作的具体实现技术取决于数据集的规模及分区方式
连接操作如果由mapper执行,则称为“map端连接”;如果由reducer执行,则称为“reduce端连接”。
Map端连接
在两个大规模输入数据集之间的map端连接会在数据到达map函数之前就执行连接操作。为达到该目的,各map的输入数据必须先分区并且以特定方式排序。各个输入数据集被划分成相同数量的分区,并且均按相同的键(连接键)排序。同一键的所有记录均会放在同一分区之中。
Map端连接操作可以连接多个作业的输出,只要这些作业的reducer数量相同、键相同并且输出文件是不可切分的(例如,小于一个HDFS块,或gzip压缩)。
Reduce端连接
由于reduce端连接并不要求输入数据集符合特定结构,因而reduce端连接比map端连接更为常用。但是,由于两个数据集均需经过MapReduce的shuffle过程,所以reduce端连接的效率往往要低一些。基本思路是mapper为各个记录标记源,并且使用连接件作为map输出键,使键相同的记录放在同一reducer中。
需要使用以下技术
1.多输入
数据集的输入源往往有多中格式,因此可以使用MultipleInputs类来方便地解析和标注各个源。
2.辅助排序
reducer将从两个源中选出键相同的记录且并不介意这些记录是否已排好序。此外,为了更好的执行连接操作,先将某一个源的数据传输到reducer会非常重要。
举个例子
现有气象站文件及气象数据文件,需要将两个文件进行关联
气象站文件内容如下
00001,北京 00002,天津 00003,山东
气象数据文件内容如下
00001,20180101,15 00001,20180102,16 00002,20180101,25 00002,20180102,26 00003,20180101,35 00003,20180102,36
要求:输出气象站ID 气象站名称及气象数据
代码如下
1.JoinRecordWithStationName类
package com.zhen.mapreduce.join; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @author FengZhen * @date 2018年9月16日 * */ public class JoinRecordWithStationName extends Configured implements Tool{ /** * 在reduce端连接中,标记气象站记录的mapper * @author FengZhen * 00001,北京 00002,天津 00003,山东 */ static class JoinStationMapper extends Mapper<LongWritable, Text, TextPair, Text>{ private NcdcStationMetadataParser parser = new NcdcStationMetadataParser(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TextPair, Text>.Context context) throws IOException, InterruptedException { if (parser.parse(value.toString())) { context.write(new TextPair(parser.getStationId(), "0"), new Text(parser.getStationName())); } } } /** * 在reduce端连接中标记天气记录的mapper * @author FengZhen * 00001,20180101,15 00001,20180102,16 00002,20180101,25 00002,20180102,26 00003,20180101,35 00003,20180102,36 */ static class JoinRecordMapper extends Mapper<LongWritable, Text, TextPair, Text> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TextPair, Text>.Context context) throws IOException, InterruptedException { parser.parse(value.toString()); context.write(new TextPair(parser.getStationId(), "1"), value); } } /** * reducer知道自己会先接收气象站记录。因此从中抽取出值,并将其作为后续每条输出记录的一部分写到输出文件。 * @author FengZhen * */ static class JoinReducer extends Reducer<TextPair, Text, Text, Text> { @Override protected void reduce(TextPair key, Iterable<Text> values, Reducer<TextPair, Text, Text, Text>.Context context) throws IOException, InterruptedException { Iterator<Text> iterator = values.iterator(); //取气象站名 Text stationName = new Text(iterator.next()); while (iterator.hasNext()) { Text record = iterator.next(); Text outValue = new Text(stationName.toString() + " " + record.toString()); context.write(key.getFirst(), outValue); } } } static class KeyPartitioner extends Partitioner<TextPair, Text>{ @Override public int getPartition(TextPair key, Text value, int numPartitions) { return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } } public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf()); job.setJobName("JoinRecordWithStationName"); job.setJarByClass(JoinRecordWithStationName.class); Path ncdcInputPath = new Path(args[0]); Path stationInputPath = new Path(args[1]); Path outputPath = new Path(args[2]); MultipleInputs.addInputPath(job, ncdcInputPath, TextInputFormat.class, JoinRecordMapper.class); MultipleInputs.addInputPath(job, stationInputPath, TextInputFormat.class, JoinStationMapper.class); FileOutputFormat.setOutputPath(job, outputPath); job.setPartitionerClass(KeyPartitioner.class); job.setGroupingComparatorClass(TextPair.FirstComparator.class); job.setMapOutputKeyClass(TextPair.class); job.setReducerClass(JoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) { String[] params = new String[] { "hdfs://fz/user/hdfs/MapReduce/data/join/JoinRecordWithStationName/input/record", "hdfs://fz/user/hdfs/MapReduce/data/join/JoinRecordWithStationName/input/station", "hdfs://fz/user/hdfs/MapReduce/data/join/JoinRecordWithStationName/output"}; int exitCode = 0; try { exitCode = ToolRunner.run(new JoinRecordWithStationName(), params); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } System.exit(exitCode); } }
2.NcdcRecordParser类
package com.zhen.mapreduce.join; import java.io.Serializable; /** * @author FengZhen * @date 2018年9月9日 * 解析天气数据 */ public class NcdcRecordParser implements Serializable{ private static final long serialVersionUID = 1L; /** * 气象台ID */ private String stationId; /** * 时间 */ private long timeStamp; /** * 气温 */ private Integer temperature; /** * 解析 * @param value */ public void parse(String value) { String[] values = value.split(","); if (values.length >= 3) { stationId = values[0]; timeStamp = Long.parseLong(values[1]); temperature = Integer.valueOf(values[2]); } } /** * 校验是否合格 * @return */ public boolean isValidTemperature() { return null != temperature; } public String getStationId() { return stationId; } public void setStationId(String stationId) { this.stationId = stationId; } public long getTimeStamp() { return timeStamp; } public void setTimeStamp(long timeStamp) { this.timeStamp = timeStamp; } public Integer getTemperature() { return temperature; } public void setTemperature(Integer temperature) { this.temperature = temperature; } }
3.NcdcStationMetadataParser类
package com.zhen.mapreduce.join; import java.io.Serializable; /** * @author FengZhen * @date 2018年9月9日 * 解析气象台数据 */ public class NcdcStationMetadataParser implements Serializable{ private static final long serialVersionUID = 1L; /** * 气象台ID */ private String stationId; /** * 气象台名称 */ private String stationName; /** * 解析 * @param value */ public boolean parse(String value) { String[] values = value.split(","); if (values.length >= 2) { stationId = values[0]; stationName = values[1]; return true; } return false; } public String getStationId() { return stationId; } public void setStationId(String stationId) { this.stationId = stationId; } public String getStationName() { return stationName; } public void setStationName(String stationName) { this.stationName = stationName; } }
4.TextPair类
package com.zhen.mapreduce.join; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * @author FengZhen * @date 2018年9月16日 * */ public class TextPair implements WritableComparable<TextPair>{ private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof TextPair) { TextPair textPair = (TextPair) obj; return first.equals(textPair.first) && second.equals(textPair.second); } return false; } public int compareTo(TextPair o) { int cmp = first.compareTo(o.first); if (cmp != 0) { return cmp; } return second.compareTo(o.second); } public Text getFirst() { return first; } public void setFirst(Text first) { this.first = first; } public Text getSecond() { return second; } public void setSecond(Text second) { this.second = second; } @Override public String toString() { return first + " " + second; } /** * 比较两个int值大小 * 降序 * @param a * @param b * @return */ public static int compare(Text a, Text b) { return a.compareTo(b); } static class FirstComparator extends WritableComparator{ protected FirstComparator() { super(TextPair.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { TextPair ip1 = (TextPair) a; TextPair ip2 = (TextPair) b; return TextPair.compare(ip1.getFirst(), ip2.getFirst()); } } }
打jar包,上传并执行
scp /Users/FengZhen/Desktop/Hadoop/file/JoinRecordWithStationName.jar [email protected]:/usr/local/test/mr hadoop jar JoinRecordWithStationName.jar com.zhen.mapreduce.join.JoinRecordWithStationName
结果如下
00001 北京 00001,20180102,16 00001 北京 00001,20180101,15 00002 天津 00002,20180102,26 00002 天津 00002,20180101,25 00003 山东 00003,20180102,36 00003 山东 00003,20180101,35
以上是关于MapReduce-join连接的主要内容,如果未能解决你的问题,请参考以下文章
使用实体框架迁移时 SQL Server 连接抛出异常 - 添加代码片段
错误:E/RecyclerView:未连接适配器;跳过片段上的布局
连接MySQL出现错误:ERROR 1045 (28000): Access denied for user ‘root‘@‘localhost‘ (using password: YES)(代码片段