RDD Join相关API,以及程序
Posted 曹军
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RDD Join相关API,以及程序相关的知识,希望对你有一定的参考价值。
1.数据集
A表数据:
1 a
2 b
3 c
B表数据:
1 aa1
1 aa2
2 bb1
2 bb2
2 bb3
4 dd1
2.join的分类
inner join
left outer join
right outer join
full outer join
left semi join
3.集中join的结果
A inner join B:
1 a 1 aa1
1 a 1 aa2
2 b 2 bb1
2 b 2 bb2
2 b 2 bb3
A left outer join B:
1 a 1 aa1
1 a 1 aa2
2 b 2 bb1
2 b 2 bb2
2 b 2 bb3
3 c null null
A right outer join B:
1 a 1 aa1
1 a 1 aa2
2 b 2 bb1
2 b 2 bb2
2 b 2 bb3
null null 4 dd1
A full outer join B:
1 a 1 aa1
1 a 1 aa2
2 b 2 bb1
2 b 2 bb2
2 b 2 bb3
3 c null null
null null 4 dd1
A left semi join B:(。。。。。注意。。。。。。)
1 a
2 b
4.API
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
返回值是RDD,RDD中的类型是一个二元组(a),a第一个元素是KEY类型的值(join的key), a第二个元素又是二元组(b), b的第一个元素是来自调用join函数的RDD的value,
b的第二个元素是来自参数other这个RDD的value
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
对于右边的数据返回的是Option类型是数据,所以如果右表数据不存在,返回的是None;否则是一个Some的具体数据
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
对于左边的数据返回的是Option类型是数据,所以如果左表数据不存在,返回的是None;否则是一个Some的具体数据
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
返回的value类型是Option封装后的数据,如果数据不存在, 返回的是None,存在返回的是Some具体数据
5.其他方式实现join
6.join程序以及非join实现join
1 package com.ibeifeng.senior.join 2 3 import org.apache.spark.{SparkConf, SparkContext} 4 5 /** 6 * RDD数据Join相关API讲解 7 * Created by ibf on 02/09. 8 */ 9 object RDDJoin { 10 def main(args: Array[String]): Unit = { 11 val conf = new SparkConf() 12 .setMaster("local[*]") 13 .setAppName("RDD-Join") 14 val sc = SparkContext.getOrCreate(conf) 15 16 // ==================具体代码====================== 17 // 模拟数据产生 18 val rdd1 = sc.parallelize(Array( 19 (1, "张三1"), 20 (1, "张三2"), 21 (2, "李四"), 22 (3, "王五"), 23 (4, "Tom"), 24 (5, "Gerry"), 25 (6, "莉莉") 26 ), 1) 27 28 val rdd2 = sc.parallelize(Array( 29 (1, "上海"), 30 (2, "北京1"), 31 (2, "北京2"), 32 (3, "南京"), 33 (4, "纽约"), 34 (6, "深圳"), 35 (7, "香港") 36 ), 1) 37 38 // 调用RDD API实现内连接 39 val joinResultRDD = rdd1.join(rdd2).map { 40 case (id, (name, address)) => { 41 (id, name, address) 42 } 43 } 44 println("----------------") 45 joinResultRDD.foreachPartition(iter => { 46 iter.foreach(println) 47 }) 48 // 调用RDD API实现左外连接 49 val leftJoinResultRDd = rdd1.leftOuterJoin(rdd2).map { 50 case (id, (name, addressOption)) => { 51 (id, name, addressOption.getOrElse("NULL")) 52 } 53 } 54 println("----------------") 55 leftJoinResultRDd.foreachPartition(iter => { 56 iter.foreach(println) 57 }) 58 // 左外连接稍微变化一下:需要左表出现,右表不出现的数据(not in) 59 println("----------------") 60 rdd1.leftOuterJoin(rdd2).filter(_._2._2.isEmpty).map { 61 case (id, (name, _)) => (id, name) 62 }.foreachPartition(iter => { 63 iter.foreach(println) 64 }) 65 66 // 右外连接 67 println("----------------") 68 rdd1 69 .rightOuterJoin(rdd2) 70 .map { 71 case (id, (nameOption, address)) => { 72 (id, nameOption.getOrElse("NULL"), address) 73 } 74 } 75 .foreachPartition(iter => iter.foreach(println)) 76 77 // 全外连接 78 println("----------------") 79 rdd1 80 .fullOuterJoin(rdd2) 81 .map { 82 case (id, (nameOption, addressOption)) => { 83 (id, nameOption.getOrElse("NULL"), addressOption.getOrElse("NULL")) 84 } 85 } 86 .foreachPartition(iter => iter.foreach(println)) 87 88 ///////////////////////////////////////////假设rdd2的数据比较少,将rdd2的数据广播出去/////////////////////////////////////// 89 val leastRDDCollection = rdd2.collect() 90 val broadcastRDDCollection = sc.broadcast(leastRDDCollection) 93 // Inner Join 95 rdd1 96 // 过滤rdd1中的数据,只要在rdd1中出现的数据,没有出现的数据过滤掉 97 .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1)) 98 // 数据合并,由于一条rdd1的数据可能在rdd2中存在多条对应数据,所以使用fla tMap 99 .flatMap { 100 case (id, name) => { 101 broadcastRDDCollection.value.filter(_._1 == id).map { 102 case (_, address) => { 103 (id, name, address) 104 } 105 } 106 } 107 } 108 .foreachPartition(iter => iter.foreach(println)) 109 110 // 左外连接 111 println("---------------------") 112 rdd1 113 .flatMap { 114 case (id, name) => { 115 // 从右表所属的广播变量中获取对应id的集合列表 116 val list = broadcastRDDCollection.value.filter(_._1 == id) 117 // 对应id的集合可能为空,也可能数据有多个 118 if (list.nonEmpty) { 119 // 存在多个 120 list.map(tuple => (id, name, tuple._2)) 121 } else { 122 // id在右表中不存在,填默认值 123 (id, name, "NULL") :: Nil 124 } 125 } 126 } 127 .foreachPartition(iter => iter.foreach(println)) 128 129 // 右外连接 130 /** 131 * rdd2中所有数据出现,由于rdd2中的数据在driver中可以存储,可以认为rdd1和rdd2通过right join之后的数据也可以在driver中保存下 132 **/ 133 println("---------------------") 134 // 将rdd1中符合条件的数据过滤出来保存到driver中 135 val stage1 = rdd1 136 .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1)) 137 .collect() 138 // 将driver中两个集合进行right join 139 val stage2 = leastRDDCollection.flatMap { 140 case (id, address) => { 141 val list = stage1.filter(_._1 == id) 142 if (list.nonEmpty) { 143 list.map(tuple => (id, tuple._2, address)) 144 } else { 145 Iterator.single((id, "NULL", address)) 146 } 147 } 148 } 149 stage2.foreach(println) 150 151 // TODO: 全外连接,不写代码,因为代码比较复杂 152 153 //==================================== 154 // 左半连接:只出现左表数据(要求数据必须在右表中也出现过),如果左表的数据在右表中出现多次,最终结果只出现一次 155 println("+++++++++++++++++") 156 println("-----------------------") 157 rdd1 158 .join(rdd2) 159 .map { 160 case (id, (name, _)) => (id, name) 161 } 162 .distinct() 163 .foreachPartition(iter => iter.foreach(println)) 164 println("------------------------") 165 rdd1 166 .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1)) 167 .foreachPartition(iter => iter.foreach(println)) 168 169 // 休眠为了看4040页面 170 Thread.sleep(1000000) 171 } 172 }
6.
以上是关于RDD Join相关API,以及程序的主要内容,如果未能解决你的问题,请参考以下文章
Spark RDD groupByKey + join vs join 性能
spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段