Spark join 产生错误的结果

Posted

技术标签:

【中文标题】Spark join 产生错误的结果【英文标题】:Spark join produces wrong results 【发布时间】:2016-03-21 13:09:39 【问题描述】:

在可能提交错误之前先在此处展示。我正在使用 Spark 1.6.0。

这是我正在处理的问题的简化版本。我过滤了一个表,然后尝试对该子集和主表进行左外连接,匹配所有列。

我在主表中只有 2 行,在过滤表中只有 1 行。我希望结果表只有子集中的单行。

scala> val b = Seq(("a", "b", 1), ("a", "b", 2)).toDF("a", "b", "c")
b: org.apache.spark.sql.DataFrame = [a: string, b: string, c: int]

scala> val a = b.where("c = 1").withColumnRenamed("a", "filta").withColumnRenamed("b", "filtb")
a: org.apache.spark.sql.DataFrame = [filta: string, filtb: string, c: int]

scala> a.join(b, $"filta" <=> $"a" and $"filtb" <=> $"b" and a("c") <=> b("c"), "left_outer").show
+-----+-----+---+---+---+---+
|filta|filtb|  c|  a|  b|  c|
+-----+-----+---+---+---+---+
|    a|    b|  1|  a|  b|  1|
|    a|    b|  1|  a|  b|  2|
+-----+-----+---+---+---+---+

我完全没想到会有这样的结果。我期待第一行,但不是第二行。我怀疑这是 null-safe 平等,所以我没有尝试过。

scala> a.join(b, $"filta" === $"a" and $"filtb" === $"b" and a("c") === b("c"), "left_outer").show
16/03/21 12:50:00 WARN Column: Constructing trivially true equals predicate, 'c#18232 = c#18232'. Perhaps you need to use aliases.
+-----+-----+---+---+---+---+
|filta|filtb|  c|  a|  b|  c|
+-----+-----+---+---+---+---+
|    a|    b|  1|  a|  b|  1|
+-----+-----+---+---+---+---+

好的,这是我预期的结果,但后来我对警告产生了怀疑。这里有一个单独的 *** 问题来处理该警告:Spark SQL performing carthesian join instead of inner join

所以我创建了一个新列来避免警告。

scala> a.withColumn("newc", $"c").join(b, $"filta" === $"a" and $"filtb" === $"b" and $"newc" === b("c"), "left_outer").show
+-----+-----+---+----+---+---+---+
|filta|filtb|  c|newc|  a|  b|  c|
+-----+-----+---+----+---+---+---+
|    a|    b|  1|   1|  a|  b|  1|
|    a|    b|  1|   1|  a|  b|  2|
+-----+-----+---+----+---+---+---+

但是现在结果又错了! 我有很多 null 安全相等检查,并且警告不是致命的,所以我看不到使用/解决此问题的明确路径。

该行为是错误,还是这是预期的行为?如果预期,为什么?

【问题讨论】:

【参考方案1】:

如果您想要预期的行为,请在名称上使用 join

val b = Seq(("a", "b", 1), ("a", "b", 2)).toDF("a", "b", "c")
val a = b.where("c = 1")

a.join(b, Seq("a", "b", "c")).show
// +---+---+---+
// |  a|  b|  c|
// +---+---+---+
// |  a|  b|  1|
// +---+---+---+

或别名:

val aa = a.alias("a")
val bb = b.alias("b")

aa.join(bb, $"a.a" === $"b.a" && $"a.b" === $"b.b" && $"a.c" === $"b.c")

您也可以使用&lt;=&gt;

aa.join(bb, $"a.a" <=> $"b.a" && $"a.b" <=> $"b.b" && $"a.c" <=> $"b.c")

据我所知,有一段时间简单相等的特殊情况。这就是为什么尽管出现警告,您仍会得到正确的结果。

第二个行为看起来确实像一个错误,与您的数据中仍然有 a.c 的事实有关。看起来它是在b.c 之前在下游选择的,并且评估的条件实际上是a.newc = a.c

val expr = $"filta" === $"a" and $"filtb" === $"b" and $"newc" === $"c"
a.withColumnRenamed("c", "newc").join(b, expr, "left_outer")

【讨论】:

我没有使用命名连接,因为没有实现空安全相等(这正是我想要的)。即使使用较重的语法,具有 null-safe 的别名版本似乎也是一种解决方法。我会为这种行为提交一个错误。 你能用 JIRA 链接 ping 我吗?

以上是关于Spark join 产生错误的结果的主要内容,如果未能解决你的问题,请参考以下文章

第37课:Spark中Shuffle详解及作业

Dataframes join 在 Spark Scala 中返回空结果

MySQL - 使用 LEFT JOIN 会产生意想不到的结果

两个 SQL LEFT JOINS 产生不正确的结果

Spark rdd.count() 产生不一致的结果

2-2spark的union和join操作演示