RDD Join相关API,以及程序

Posted 曹军

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RDD Join相关API,以及程序相关的知识,希望对你有一定的参考价值。

1.数据集  

  A表数据:
    1 a
    2 b
    3 c
  B表数据:
    1 aa1
    1 aa2
    2 bb1
    2 bb2
    2 bb3
    4 dd1

 

2.join的分类

  inner join

  left outer join

  right outer join

  full outer join

  left semi join

  

 

3.集中join的结果

  A inner join B:
    1 a 1 aa1
    1 a 1 aa2
    2 b 2 bb1
    2 b 2 bb2
    2 b 2 bb3

  A left outer join B:
    1 a 1 aa1
    1 a 1 aa2
    2 b 2 bb1
    2 b 2 bb2
    2 b 2 bb3
    3 c null null

  A right outer join B:
    1 a 1 aa1
    1 a 1 aa2
    2 b 2 bb1
    2 b 2 bb2
    2 b 2 bb3
    null null 4 dd1

  A full outer join B:
    1 a 1 aa1
    1 a 1 aa2
    2 b 2 bb1
    2 b 2 bb2
    2 b 2 bb3
    3 c null null
    null null 4 dd1

  A left semi join B:(。。。。。注意。。。。。。)
    1 a
    2 b

 

4.API  

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
  返回值是RDD,RDD中的类型是一个二元组(a),a第一个元素是KEY类型的值(join的key), a第二个元素又是二元组(b), b的第一个元素是来自调用join函数的RDD的value,
  b的第二个元素是来自参数other这个RDD的value

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
  对于右边的数据返回的是Option类型是数据,所以如果右表数据不存在,返回的是None;否则是一个Some的具体数据

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
  对于左边的数据返回的是Option类型是数据,所以如果左表数据不存在,返回的是None;否则是一个Some的具体数据

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
  返回的value类型是Option封装后的数据,如果数据不存在, 返回的是None,存在返回的是Some具体数据

 

5.其他方式实现join

  

 

6.join程序以及非join实现join

  1 package com.ibeifeng.senior.join
  2 
  3 import org.apache.spark.{SparkConf, SparkContext}
  4 
  5 /**
  6   * RDD数据Join相关API讲解
  7   * Created by ibf on 02/09.
  8   */
  9 object RDDJoin {
 10   def main(args: Array[String]): Unit = {
 11     val conf = new SparkConf()
 12       .setMaster("local[*]")
 13       .setAppName("RDD-Join")
 14     val sc = SparkContext.getOrCreate(conf)
 15 
 16     // ==================具体代码======================
 17     // 模拟数据产生
 18     val rdd1 = sc.parallelize(Array(
 19       (1, "张三1"),
 20       (1, "张三2"),
 21       (2, "李四"),
 22       (3, "王五"),
 23       (4, "Tom"),
 24       (5, "Gerry"),
 25       (6, "莉莉")
 26     ), 1)
 27 
 28     val rdd2 = sc.parallelize(Array(
 29       (1, "上海"),
 30       (2, "北京1"),
 31       (2, "北京2"),
 32       (3, "南京"),
 33       (4, "纽约"),
 34       (6, "深圳"),
 35       (7, "香港")
 36     ), 1)
 37 
 38     // 调用RDD API实现内连接
 39     val joinResultRDD = rdd1.join(rdd2).map {
 40       case (id, (name, address)) => {
 41         (id, name, address)
 42       }
 43     }
 44     println("----------------")
 45     joinResultRDD.foreachPartition(iter => {
 46       iter.foreach(println)
 47     })
 48     // 调用RDD API实现左外连接
 49     val leftJoinResultRDd = rdd1.leftOuterJoin(rdd2).map {
 50       case (id, (name, addressOption)) => {
 51         (id, name, addressOption.getOrElse("NULL"))
 52       }
 53     }
 54     println("----------------")
 55     leftJoinResultRDd.foreachPartition(iter => {
 56       iter.foreach(println)
 57     })
 58     // 左外连接稍微变化一下:需要左表出现,右表不出现的数据(not in)
 59     println("----------------")
 60     rdd1.leftOuterJoin(rdd2).filter(_._2._2.isEmpty).map {
 61       case (id, (name, _)) => (id, name)
 62     }.foreachPartition(iter => {
 63       iter.foreach(println)
 64     })
 65 
 66     // 右外连接
 67     println("----------------")
 68     rdd1
 69       .rightOuterJoin(rdd2)
 70       .map {
 71         case (id, (nameOption, address)) => {
 72           (id, nameOption.getOrElse("NULL"), address)
 73         }
 74       }
 75       .foreachPartition(iter => iter.foreach(println))
 76 
 77     // 全外连接
 78     println("----------------")
 79     rdd1
 80       .fullOuterJoin(rdd2)
 81       .map {
 82         case (id, (nameOption, addressOption)) => {
 83           (id, nameOption.getOrElse("NULL"), addressOption.getOrElse("NULL"))
 84         }
 85       }
 86       .foreachPartition(iter => iter.foreach(println))
 87 
 88     ///////////////////////////////////////////假设rdd2的数据比较少,将rdd2的数据广播出去///////////////////////////////////////
 89     val leastRDDCollection = rdd2.collect()
 90     val broadcastRDDCollection = sc.broadcast(leastRDDCollection) 93     // Inner Join 95     rdd1
 96       // 过滤rdd1中的数据,只要在rdd1中出现的数据,没有出现的数据过滤掉
 97       .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
 98       // 数据合并,由于一条rdd1的数据可能在rdd2中存在多条对应数据,所以使用fla  tMap
 99       .flatMap {
100       case (id, name) => {
101         broadcastRDDCollection.value.filter(_._1 == id).map {
102           case (_, address) => {
103             (id, name, address)
104           }
105         }
106       }
107     }
108       .foreachPartition(iter => iter.foreach(println))
109 
110     // 左外连接
111     println("---------------------")
112     rdd1
113       .flatMap {
114         case (id, name) => {
115           // 从右表所属的广播变量中获取对应id的集合列表
116           val list = broadcastRDDCollection.value.filter(_._1 == id)
117           // 对应id的集合可能为空,也可能数据有多个
118           if (list.nonEmpty) {
119             // 存在多个
120             list.map(tuple => (id, name, tuple._2))
121           } else {
122             // id在右表中不存在,填默认值
123             (id, name, "NULL") :: Nil
124           }
125         }
126       }
127       .foreachPartition(iter => iter.foreach(println))
128 
129     // 右外连接
130     /**
131       * rdd2中所有数据出现,由于rdd2中的数据在driver中可以存储,可以认为rdd1和rdd2通过right join之后的数据也可以在driver中保存下
132       **/
133     println("---------------------")
134     // 将rdd1中符合条件的数据过滤出来保存到driver中
135     val stage1 = rdd1
136       .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
137       .collect()
138     // 将driver中两个集合进行right join
139     val stage2 = leastRDDCollection.flatMap {
140       case (id, address) => {
141         val list = stage1.filter(_._1 == id)
142         if (list.nonEmpty) {
143           list.map(tuple => (id, tuple._2, address))
144         } else {
145           Iterator.single((id, "NULL", address))
146         }
147       }
148     }
149     stage2.foreach(println)
150 
151     // TODO: 全外连接,不写代码,因为代码比较复杂
152   
153     //====================================
154     // 左半连接:只出现左表数据(要求数据必须在右表中也出现过),如果左表的数据在右表中出现多次,最终结果只出现一次
155     println("+++++++++++++++++")
156     println("-----------------------")
157     rdd1
158       .join(rdd2)
159       .map {
160         case (id, (name, _)) => (id, name)
161       }
162       .distinct()
163       .foreachPartition(iter => iter.foreach(println))
164     println("------------------------")
165     rdd1
166       .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
167       .foreachPartition(iter => iter.foreach(println))
168 
169     // 休眠为了看4040页面
170         Thread.sleep(1000000)
171   }
172 }

 

6.

 

以上是关于RDD Join相关API,以及程序的主要内容,如果未能解决你的问题,请参考以下文章

Spark RDD groupByKey + join vs join 性能

RDD Join中宽依赖与窄依赖的判断

spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段

spark核心RDD的概念解析创建以及相关操作

Spark 中Transformation Action操作 以及RDD的持久化

Spark 中Transformation Action操作 以及RDD的持久化