基于非空值加入火花数据框(scala)

Posted

技术标签:

【中文标题】基于非空值加入火花数据框(scala)【英文标题】:Join in spark dataframe (scala) based on not null values 【发布时间】:2017-08-22 10:54:17 【问题描述】:

假设我有两个数据框,如下所示:

首先 -

A    | B    | C    | D
1a   | 1b   | 1c   | 1d
2a   | null | 2c   | 2d
3a   | null | null | 3d
4a   | 4b   | null | null
5a   | null | null | null
6a   | 6b   | 6c   | null

第二个-

P    | B    | C    | D
1p   | 1b   | 1c   | 1d
2p   | 2b   | 2c   | 2d
3p   | 3b   | 3c   | 3d
4p   | 4b   | 4c   | 4d 
5p   | 5b   | 5c   | 5d
6p   | 6b   | 6c   | 6d 

join 操作是基于 "B", "C", "D" 进行的。如果这些列中的任何一个出现空值,它应该检查剩余列中出现的非空值。

所以,结果应该是-

P    | B    | C    | D    | A
1p   | 1b   | 1c   | 1d   | 1a
2p   | null | 2c   | 2d   | 2a
3p   | null | null | 3d   | 3a
4p   | 4b   | null | null | 4a // First(C) & First(D) was null so we take only B
6p   | 6b   | 6c   | null | 6a

任何人都可以为此查询提出任何解决方案吗? 目前我正在尝试过滤在单列、两列、三列中具有空值的值。然后将它们与 Second 一起加入,而无需占用该列。例如 - 我首先从 First 中过滤掉只有 B 为 null 的值。然后根据“C”和“D”将其与 Second 连接。 这样,我会得到很多数据帧,最后我会合并它们。

【问题讨论】:

【参考方案1】:

你可以这样做

import org.apache.spark.sql.functions._
df1.join(broadcast(df2), df1("B") === df2("B") || df1("C") === df2("C") || df1("D") === df2("D"))
  .drop(df2("B"))
  .drop(df2("C"))
  .drop(df2("D"))
  .show(false)

为了更安全,您可以broadcast dataframe,它的尺寸更小。

【讨论】:

您的解决方案在某种程度上是正确的。所以,我将您的解决方案扩展到了这个 - '"df1.join(broadcast(df2), (((df2("B")===null) || (df1("B") === df2("B "))) && ((df2("C")===null) || (df1("C") === df2("C"))) && ((df2("D")===空) || (df1("D") === df2("D"))))) .drop(df2("B")) .drop(df2("C")) .drop(df2(" D")) .show(false)" 这意味着它应该 df1(Col) 应该为空或等于 df2(Col) 但我仍然只得到单行 - 1a |1p |1b |1c |1d 但不是其他。知道为什么吗? 这不是您问题的解决方案吗? :) 呵呵。那是因为你正在使用 &&。你为什么使用df2("B")===null)? null 值在 df1 中,不是吗? 我不知道您要做什么,但您应该尝试` df1.join(broadcast(df2), ( ((df2("B")==="null") | | (df1("B") === df2("B"))) && ((df2("C")==="null") || (df1("C") === df2(" C"))) && ((df2("D")==="null") || (df1("D") === df2("D"))))) .drop(df2("B ")) .drop(df2("C")) .drop(df2("D")) .show(false)`。但是您应该知道哪个数据框具有空值。根据你的问题 df1 有空值而不是 df2【参考方案2】:

我认为左连接应该可以完成工作,请尝试以下代码:

val group = udf((p1: String, p2: String, p3: String) => if (p1 != null) p1 else if (p2 != null) p2 else if (p3 != null) p3 else null)
val joined = first.join(second.select("B", "P"), Seq("B"), "left")
                  .withColumnRenamed("P", "P1")
                  .join(second.select("C", "P"), Seq("C"), "left")
                  .withColumnRenamed("P", "P2")
                  .join(second.select("D", "P"), Seq("D"), "left")
                  .withColumnRenamed("P", "P3")
                  .select($"A", $"B", $"C", $"D", group($"P1", $"P2", $"P3") as "P")
                  .where($"P".isNotNull) 

希望对您有所帮助,否则请评论您的问题

【讨论】:

感谢您的回复。我已经编辑了我的问题。有一些错误。我已经尝试过使用您的解决方案。但我得到的是“null”值而不是“4a”(见注释行) 如果您尝试加入 B @Ishan 会怎样? 我必须首先基于 B、C 和 D 加入。如果任何一列为空,那么我将不得不基于另外两个加入。同样,如果两列同时具有空值,那么我将基于第三列加入。 @Ishan 我已经编辑了答案尝试并给出结果 我已经展示了我从您的解决方案中得到的结果。查看更新后的问题。

以上是关于基于非空值加入火花数据框(scala)的主要内容,如果未能解决你的问题,请参考以下文章

计算Spark DataFrame中的非空值的数量

从 Pyspark 中的数据框中计算空值和非空值

Spark / Scala - RDD填充最后一个非空值

我只需要在 pyspark 数据框中附加那些具有非空值的人

Scala DataFrame,将非空列的值复制到新列中

如何在 PySpark 中用该列的第一个非空值填充该列的空值