MapReduce实现等值连接,左外连接,右外连接,全外连接
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce实现等值连接,左外连接,右外连接,全外连接相关的知识,希望对你有一定的参考价值。
#测试数据:
# more user.txt(用户ID,用户名)
- 1 lavimer
- 2 liaozhongmin
- 3 liaozemin
#more post.txt(用户ID,帖子ID,标题)
- 1 1 java
- 1 2 c
- 2 3 hadoop
- 4 4 hive
- 5 5 hbase
- 5 6 pig
- 5 7 flume
#等值连接结果如下:
- 1 lavimer 1 1 java
- 1 lavimer 1 2 c
- 2 liaozhongmin 2 3 hadoop
#左外连接结果如下:
- 1 lavimer 1 1 java
- 1 lavimer 1 2 c
- 2 liaozhongmin 2 3 hadoop
- 3 liaozemin NULL
#右外连接结果如下:
- 1 lavimer 1 1 java
- 1 lavimer 1 2 c
- 2 liaozhongmin 2 3 hadoop
- NULL 4 4 hive
- NULL 5 5 hbase
- NULL 5 6 pig
- NULL 5 7 flume
#全外连接结果如下:
- 1 lavimer 1 1 java
- 1 lavimer 1 2 c
- 2 liaozhongmin 2 3 hadoop
- 3 liaozemin NULL
- NULL 4 4 hive
- NULL 5 5 hbase
- NULL 5 6 pig
- NULL 5 7 flume
实现代码如下:
- /**
- *
- * @author 廖钟民
- * time : 2015年1月30日下午1:23:36
- * @version
- */
- public class UserPostJoin {
- // 定义输入路径
- private static final String INPUT_PATH1 = "hdfs://liaozhongmin:9000/user_post_join/user.txt";
- private static final String INPUT_PATH2 = "hdfs://liaozhongmin:9000/user_post_join/post.txt";
- // 定义输出路径
- private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
- public static void main(String[] args) {
- try {
- // 创建配置信息
- Configuration conf = new Configuration();
- // 创建文件系统
- FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
- // 如果输出目录存在,我们就删除
- if (fileSystem.exists(new Path(OUT_PATH))) {
- fileSystem.delete(new Path(OUT_PATH), true);
- }
- // 创建任务
- Job job = new Job(conf, UserPostJoin.class.getName());
- // 设置连接类型
- job.getConfiguration().set("joinType", "allOuter");
- // 设置多路径输入
- MultipleInputs.addInputPath(job, new Path(INPUT_PATH1), TextInputFormat.class, UserMapper.class);
- MultipleInputs.addInputPath(job, new Path(INPUT_PATH2), TextInputFormat.class, PostMapper.class);
- //1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(UserPost.class);
- //1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
- job.setPartitionerClass(HashPartitioner.class);
- job.setNumReduceTasks(1);
- //1.4 排序
- //1.5 归约
- //2.1 Shuffle把数据从Map端拷贝到Reduce端。
- //2.2 指定Reducer类和输出key和value的类型
- job.setReducerClass(UserPostReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- //2.3 指定输出的路径和设置输出的格式化类
- FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
- job.setOutputFormatClass(TextOutputFormat.class);
- // 提交作业 退出
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- /**
- * 自定义Mapper类用于处理来自user.txt文件的数据
- * @author 廖钟民
- * time : 2015年1月30日下午1:22:12
- * @version
- */
- public static class UserMapper extends Mapper<LongWritable, Text, Text, UserPost> {
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, UserPost>.Context context) throws IOException, InterruptedException {
- // 对字符串进行切分
- String[] arr = value.toString().split("\t");
- // 创建UserId
- Text userId = new Text(arr[0]);
- // 把结果写出去
- context.write(userId, new UserPost("U", value.toString()));
- }
- }
- /**
- * 自定义Mapper类用于处理来自post.txt文件的数据
- * @author 廖钟民
- * time : 2015年1月30日下午1:22:16
- * @version
- */
- public static class PostMapper extends Mapper<LongWritable, Text, Text, UserPost> {
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, UserPost>.Context context) throws IOException, InterruptedException {
- // 对数据进行切分
- String[] arr = value.toString().split("\t");
- // 创建用户ID
- Text userId = new Text(arr[0]);
- context.write(userId, new UserPost("P", value.toString()));
- }
- }
- /**
- * 自定义Reducer类用于处理不同Mapper类的输出
- * @author 廖钟民
- * time : 2015年1月30日下午1:23:05
- * @version
- */
- public static class UserPostReducer extends Reducer<Text, UserPost, Text, Text> {
- // 定义List集合用于存放用户
- private List<Text> users = new ArrayList<Text>();
- private List<Text> posts = new ArrayList<Text>();
- // 定义连接类型
- private String joinType;
- @Override
- protected void setup(Reducer<Text, UserPost, Text, Text>.Context context) throws IOException, InterruptedException {
- this.joinType = context.getConfiguration().get("joinType");
- System.out.println(joinType);
- }
- /**
- * 经过Shuffle后数据会分组,每一组数据都会调用一次reduce()函数
- *第一组数据:
- *1 lavimer
- *1 1 java
- *1 2 c
- *
- *第二组数据:
- *2 3 hadoop
- *2 liaozhongmin
- *
- *第三组数据:
- *3 liaozemin
- *
- *第四组数据:
- *4 4 hive
- *
- *第五组数据:
- *5 5 hbase
- *5 6 pig
- *5 7 flume
- *
- *每一组数据都会调用一次reduce()函数,我们以第一组数据为例进行讲解:
- *
- *进入reduce函数后,<1,lavimer>会被添加到users集合中
- *<1 1 java>和<1 2 c>会被添加到posts集合中
- *
- *然后是判断当前操作是什么类型的连接,我们以等值连接为例:
- *遍历两个集合得到的数据为:
- *【1 lavimer 1 1 java】
- *【1 lavimer 1 2 c】
- *
- *这是第一组数据的执行轨迹,其他依次类推就可以得到相关的操作
- */
- protected void reduce(Text key, Iterable<UserPost> values, Reducer<Text, UserPost, Text, Text>.Context context) throws IOException,
- InterruptedException {
- // 清空集合
- users.clear();
- posts.clear();
- // 迭代values集合把当前穿进来的某个组中的数据分别添加到对应的集合中
- for (UserPost val : values) {
- System.out.println("实际值:" + key + "===>" + values.toString());
- if (val.getType().equals("U")) {
- users.add(new Text(val.getData()));
- } else {
- posts.add(new Text(val.getData()));
- }
- }
- // 根据joinType关键字做对应的连接操作
- if (joinType.equals("innerJoin")) {// 内连接
- if (users.size() > 0 && posts.size() > 0) {
- for (Text user : users) {
- for (Text post : posts) {
- context.write(new Text(user), new Text(post));
- }
- }
- }
- } else if (joinType.equals("leftOuter")) {// 左外连接
- for (Text user : users) {
- if (posts.size() > 0) {
- for (Text post : posts) {
- context.write(new Text(user), new Text(post));
- }
- } else {
- context.write(new Text(user), createEmptyPost());
- }
- }
- } else if (joinType.equals("rightOuter")) {// 右外连接
- for (Text post : posts) {
- if (users.size() > 0) {
- for (Text user : users) {
- context.write(new Text(user), new Text(post));
- }
- } else {
- context.write(createEmptyUser(), post);
- }
- }
- } else if (joinType.equals("allOuter")) {// 全外连接
- if (users.size() > 0) {
- for (Text user : users) {
- if (posts.size() > 0) {
- for (Text post : posts) {
- context.write(new Text(user), new Text(post));
- }
- } else {
- context.write(new Text(user), createEmptyPost());
- }
- }
- } else {
- for (Text post : posts) {
- if (users.size() > 0) {
- for (Text user : users) {
- context.write(new Text(user), new Text(post));
- }
- } else {
- context.write(createEmptyUser(), post);
- }
- }
- }
- }
- }
- /**
- * 用户为空时用制表符代替
- *
- * @return
- */
- private Text createEmptyUser() {
- return new Text("NULL");
- }
- /**
- * 帖子为空时用制表符代替
- *
- * @return
- */
- private Text createEmptyPost() {
- return new Text("NULL");
- }
- }
- }
- /**
- * 自定义实体类封装两个表的数据
- * @author 廖钟民
- * time : 2015年1月30日下午1:23:50
- * @version
- */
- class UserPost implements Writable {
- // 类型(U表示用户,P表示帖子)
- private String type;
- private String data;
- public UserPost() {
- }
- public UserPost(String type, String data) {
- this.type = type;
- this.data = data;
- }
- public String getType() {
- return type;
- }
- public void setType(String type) {
- this.type = type;
- }
- public String getData() {
- return data;
- }
- public void setData(String data) {
- this.data = data;
- }
- public void write(DataOutput out) throws IOException {
- out.writeUTF(this.type);
- out.writeUTF(this.data);
- }
- public void readFields(DataInput in) throws IOException {
- this.type = in.readUTF();
- this.data = in.readUTF();
- }
- }
以上是关于MapReduce实现等值连接,左外连接,右外连接,全外连接的主要内容,如果未能解决你的问题,请参考以下文章
Hive sql中的 各种join(内连接左外连接右外连接满外连接)
Hive sql中的 各种join(内连接左外连接右外连接满外连接)