跟A君学大数据-用MapReduce实现表关联

Posted 六点A君

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了跟A君学大数据-用MapReduce实现表关联相关的知识,希望对你有一定的参考价值。

前言

前面使用 MapReduce,可以进行单词计数,单词去重,数字排序等,那么结合到数据库应用,
如何实现表关联呢?
MapReduce 更像算法题,怎么通过 Map 和 Reduce 这两个步骤来实现关联,得到所需数据呢?
例如有一张表,里面两个字段,child 和 parent,现在让你找出里面的 grandChild 和 grandParent 来。
mysql 为例,我们直接一行 sql 就可以解决:

 
   
   
 
  1. select a.child,b.parent

  2. from child_parent a, child_parent b

  3. where a.parent=b.child

  4. order by a.child desc

那么从 MapReduce 角度该如何设计 Map 以及 Reduce 函数呢?

设计

  1. 需要使得左表的 parent 和右表的 child 列相连接。

  2. 将 paren 设置为 key,而 child 作为 value 进行输出,作为左表

  3. 再将同一对 child 和 paren 的 child 设为 key,而 parent 设置为 value 作为输出。

  4. 给每个输出增加标志作为区分左右表。

  5. 在 Reduce 函数的接受的结果中,每个 key 的 value-list 包含了 grandchild 和 grandparen 关系

  6. 取出每个 key 的 value 进行解析,将左表的 child 放到一个数组,右表的 parent 放到一个数组,最后做双重循环迪卡尔集即可(就如 sql 语句中的笛卡尔集)

  7. 因为在 Reduce 中,给出的是 key 相同的 value_list,所以就是相当于上面 sql 的 where a.parent=b.child

具体实现

 
   
   
 
  1. package com.anla.chapter3.innerjoin;

  2. import org.apache.hadoop.conf.Configuration;

  3. import org.apache.hadoop.fs.Path;

  4. import org.apache.hadoop.io.Text;

  5. import org.apache.hadoop.mapreduce.Job;

  6. import org.apache.hadoop.mapreduce.Mapper;

  7. import org.apache.hadoop.mapreduce.Reducer;

  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  10. import org.apache.hadoop.util.GenericOptionsParser;

  11. import java.io.IOException;

  12. import java.util.Iterator;

  13. /**

  14. * @user anLA7856

  15. * @time 19-3-22 下午6:01

  16. * @description

  17. */

  18. public class SimpleJoin {

  19. public static int time = 0;

  20. public static class Map extends Mapper<Object, Text, Text, Text> {

  21. @Override

  22. protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {

  23. String childName;

  24. String parentName;

  25. String relationType;

  26. String line = value.toString();

  27. int i = 0;

  28. // 用来寻找分隔符

  29. String[] values = line.split(" ");

  30. if (!"child".equals(values[0])) {

  31. // 不为child,即不计算第一行

  32. childName = values[0];

  33. parentName = values[1];

  34. relationType = "1"; // 左右表区分

  35. context.write(new Text(parentName), new Text(relationType+"+"+childName+"+"+parentName)); // 左表

  36. relationType = "2"; // 左右表区分

  37. context.write(new Text(childName), new Text(relationType+"+" + childName + "+" +parentName)); // 右表

  38. }

  39. }

  40. }

  41. public static class Reduce extends Reducer<Text, Text, Text, Text> {

  42. @Override

  43. protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

  44. if (time == 0) { // 输出表头

  45. context.write(new Text("grandChild"), new Text("grandParent"));

  46. time ++;

  47. }

  48. int grandChildNum = 0;

  49. String grandChild[] = new String[10];

  50. int grandParentNum = 0;

  51. String grandParent[] = new String[10];

  52. Iterator iterator = values.iterator();

  53. while (iterator.hasNext()){

  54. String record = iterator.next().toString();

  55. int len = record.length();

  56. if (len == 0) {

  57. continue;

  58. }

  59. char relationType = record.charAt(0);

  60. String childName = record.split("\\+")[1];

  61. String parentName = record.split("\\+")[2];

  62. // 左表

  63. if (relationType == '1') {

  64. grandChild[grandChildNum] = childName;

  65. grandChildNum ++;

  66. }else {

  67. grandParent[grandParentNum] = parentName;

  68. grandParentNum++;

  69. }

  70. }

  71. // grandChild和grandParent求迪卡尔

  72. if (grandChildNum != 0 && grandParentNum != 0) {

  73. for (int m = 0; m <grandChildNum; m++) {

  74. for (int n = 0; n < grandParentNum; n++){

  75. context.write(new Text(grandChild[m]), new Text(grandParent[n]));

  76. }

  77. }

  78. }

  79. }

  80. }

  81. public static void main(String[] args) throws Exception{

  82. Configuration configuration = new Configuration();

  83. String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();

  84. if (otherArgs.length != 2) {

  85. System.out.println("Usage: Sort <in> <out>");

  86. System.exit(2);

  87. }

  88. Job job = Job.getInstance(configuration, "SimpleJoin");

  89. job.setJarByClass(SimpleJoin.class);

  90. job.setMapperClass(Map.class);

  91. job.setReducerClass(Reduce.class);

  92. job.setOutputKeyClass(Text.class);

  93. job.setOutputValueClass(Text.class);

  94. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

  95. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

  96. System.exit(job.waitForCompletion(true) ? 0:1);

  97. }

  98. }

还是按照前一篇运行方法:跟 A 君学大数据 (二)- 手把手运行 Hadoop 的 WordCount 程序

得到结果:

参考资料:

  1. Hadoop In Action


以上是关于跟A君学大数据-用MapReduce实现表关联的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce编程之实现多表关联

大数据学习之十二——MapReduce代码实例:关联性操作

《从0开始学大数据》之MapReduce 计算框架是如何运作的

mapreduce-实现多表关联

mapreduce-实现单表关联

《从0开始学大数据》之Spark的编程模型