Spark - LeftOuterJoin 结果条数与左表条数不一致
Posted BIT_666
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark - LeftOuterJoin 结果条数与左表条数不一致相关的知识,希望对你有一定的参考价值。
一.引言
使用 spark lefOuterJoin 寻找下发的 gap,用原始下发 rdd 左join 真实下发后发现最终的结果数与左表不一致,左表数据: 20350,最终数据: 25721。一直以来使用 Hive 都是默认 leftJoin 左表应该与结果一致,所以开始排查。
二.问题排查
20350 条变成 25721 条数据,所以大概率是出现了同 key 的情况,分别检查两边的数据,发现左表、右表均有相同的下发记录,所以导致最终进入循环的数目 countNum 超过了左表的行数,为了避免之后再遇到这样的问题,下面遍历下常见的情况,先初始化一个 SaprkContext 并添加 3对 pairRdd,其中 rddA,rddC 存在重复 key,rddB 无重复 key:
val conf = new SparkConf().setAppName("TestLefterJoin").setMaster("local[5]")
val spark = SparkSession
.builder
.config(conf)
.getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("error")
val rddA = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6")))
val rddB = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5")))
val rddC = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "7")))
1.左表 key 有重复
rddA.leftOuterJoin(rddB).foreach(info =>
println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
)
左表 (A,1),(A,6) 重复,二者分别与右表的 (A, 1) 匹配,所以分别得到 (A, 1, 1) 和 (A, 6, 1) ,如果右表没有 "A" 的 key,匹配结果是 (A, 1, NULL) 与 (A, 6, NULL)
(B,2,2)
(D,4,4)
(E,5,5)
(A,1,1)
(C,3,3)
(A,6,1)
结论:左表有重复 left join 后结果与左表行数一致
2.右表 key 有重复
rddB.leftOuterJoin(rddA).foreach(info =>
println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
)
将上述 rddA 与 rddB 对调得到右表有重复的结果,(A, 1) 分别有右表 (A, 1) 与 (A, 6) 匹配得到 (A, 1, 1) 与 (A, 1, 6),结果一对多
(A,1,1)
(C,3,3)
(E,5,5)
(B,2,2)
(D,4,4)
(A,1,6)
结论:右表有重复 left join 后结果与左表行数不一致,增加行数为右表重复 key 的数 - 1
3.左右表 key 都有重复
rddA.leftOuterJoin(rddC).foreach(info =>
println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
)
左表 (A,1) 、(A,6) 与右表 (A,1)、(A,7) 直接得到 2x2 四种匹配,比左表多2条数据
(B,2,2)
(C,3,3)
(E,5,5)
(A,1,1)
(D,4,4)
(A,1,7)
(A,6,1)
(A,6,7)
结论:左右表有重复 left join 后结果与左表行数不一致, 增加行数为右表重复 key 的数目
4.左表 key 有 null 且重复
val rddANull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6"), (null, "7"), (null, "8")))
val rddBNull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5")))
rddANull.leftOuterJoin(rddBNull).foreach(info =>
println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
)
左表的 (null, 7) , (null, 8) 会把 null 当做单独的 key 匹配,所以不影响
(B,2,2)
(E,5,5)
(null,7,NULL)
(C,3,3)
(A,1,1)
(A,6,1)
(D,4,4)
结论:左表有重复 null key 不影响 left join 与行数
5.右表 key 有 null 且重复
val rddANull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6"), (null, "7"), (null, "8")))
val rddCNull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "7"), (null, "8")))
rddCNull.leftOuterJoin(rddANull).foreach(info =>
println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
)
左表 (null, 8) 与右表 (null,7)、(null,8) 匹配得到两条记录。
(B,2,2)
(C,3,3)
(D,4,4)
(E,5,5)
(null,8,7)
(null,8,8)
(A,1,1)
(A,1,6)
(A,7,1)
(A,7,6)
结论:右表有重复 null key 影响 left join 行数,增加数目为右表重复 key 数 - 1
6.左右表都有重复 null key
val rddANull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6"), (null, "7"), (null, "8")))
rddANull.leftOuterJoin(rddANull).foreach(info =>
println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
)
两边都有 (null,7)、(null,8) ,和上面正常 key 左右表重复结果相同,多2条记录
(B,2,2)
(D,4,4)
(E,5,5)
(null,7,7)
(null,7,8)
(null,8,7)
(null,8,8)
(A,1,1)
(A,1,6)
(A,6,1)
(A,6,6)
(C,3,3)
结论:左右均重复 null key 时影响 left join 行数,其中增加行数为重复 null key的数
Tips:
经过上面3次试验可以看到 null 作为 pairRdd 的 key 在进行 join 时和正常的 key join 时是一样的,唯一的区别是处理这类型的 key 时需要注意非 null 的判断,否则容易报错
7.表中包含纯 null
val rddDNull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6"), (null, "9"), (null, "10")))
val rddENull = sc.parallelize(Array(("A", "1"), ("B", "2"), ("C", "3"), ("D", "4"), ("E", "5"), ("A", "6"), null))
rddENull.leftOuterJoin(rddDNull).foreach(info =>
println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
)
rddDNull.leftOuterJoin(rddENull).foreach(info =>
println(info._1, info._2._1, info._2._2.getOrElse("NULL"))
)
不管是左表有纯 null 还是右表有纯 null 或者都有 null,都会报错 NullPoint:
结论:pairRdd 中有纯 null 使用 join 会报错
三.问题修复
上面遍历了重复和 null 的问题,主要导致左join与左表条数不一致的原因还是右表重复key导致,所以问题修复主要是去重:
A.distinct
直接对 rdd 全局去重,但是只能去除相同的 (key, value)
B.groupByKey
将 (key, value1)、(key, value2) .... 相同 key 的 pairRdd 元素聚合
上述两种方法是 PairRdd 常用的去重方法,不过怎么去重还需要结合业务场景,如果确实是相同的多余日志则使用 distinct,如果确实有重复日志且需要聚合信息则采用 groupByKey 、reduceByKey 等聚合方式,当然如果左右表都有重复且场景确需,正常 join 即可。
四.总结
这里 spark pairRdd leftJoin 可能增加结果的行数,使用 spark DataFrame 使用 join 时:
val sqlContext = new SQLContext(sc)
documentDFA.join(documentDFB).select("xxx").where("xxx")
使用 select + where 得到的结果不一定会大于等于左表行数。再回看一下引言的数据,左表数据: 20350,最终数据: 25721,共增加了 5371 行,如果右表单独重复 Xi 个 key,每个 key 重复数目 Mi 个,左右表共重复 Yi 个 key,每个 key 重复数目 Ni 个,按照上面的公式应该满足:
以上是关于Spark - LeftOuterJoin 结果条数与左表条数不一致的主要内容,如果未能解决你的问题,请参考以下文章
Spark RDD常用算子操作 键值对关联操作 subtractByKey, join,fullOuterJoin, rightOuterJoin, leftOuterJoin