Spark SQL Dataframe API - 动态构建过滤条件

Posted

技术标签:

【中文标题】Spark SQL Dataframe API - 动态构建过滤条件【英文标题】:Spark SQL Dataframe API -build filter condition dynamically 【发布时间】:2017-12-03 11:45:27 【问题描述】:

我有两个 Spark 数据框,df1df2

+-------+-----+---+
|   name|empNo|age|
+-------+-----+---+
|shankar|12121| 28|
| ramesh| 1212| 29|
| suresh| 1111| 30|
| aarush| 0707| 15|
+-------+-----+---+
+------+-----+---+-----+
| eName|  eNo|age| city|
+------+-----+---+-----+
|aarush|12121| 15|malmo|
|ramesh| 1212| 29|malmo|
+------+-----+---+-----+

我需要根据另一个文件中指定的列数从df1 获取不匹配记录

例如,列查找文件如下所示:

df1col,df2col
name,eName
empNo, eNo

预期输出为:

+-------+-----+---+
|   name|empNo|age|
+-------+-----+---+
|shankar|12121| 28|
| suresh| 1111| 30|
| aarush| 0707| 15|
+-------+-----+---+

想法是如何为上述场景动态构建 where 条件,因为查找文件是可配置的,所以它可能有 1 到 n 个字段。

【问题讨论】:

【参考方案1】:

您可以使用except 数据框方法。为简单起见,我假设要使用的列位于两个列表中。两个列表的顺序必须正确,列表中相同位置的列将被比较(不管列名)。在except 之后,使用join 从第一个数据帧中获取缺失的列。

val df1 = Seq(("shankar","12121",28),("ramesh","1212",29),("suresh","1111",30),("aarush","0707",15))
  .toDF("name", "empNo", "age")
val df2 = Seq(("aarush", "12121", 15, "malmo"),("ramesh", "1212", 29, "malmo"))
  .toDF("eName", "eNo", "age", "city")

val df1Cols = List("name", "empNo")
val df2Cols = List("eName", "eNo")

val tempDf = df1.select(df1Cols.head, df1Cols.tail: _*)
  .except(df2.select(df2Cols.head, df2Cols.tail: _*))    
val df = df1.join(broadcast(tempDf), df1Cols)

生成的数据框将看起来像想要的那样:

+-------+-----+---+
|   name|empNo|age|
+-------+-----+---+
| aarush| 0707| 15|
| suresh| 1111| 30|
|shankar|12121| 28|
+-------+-----+---+

【讨论】:

如果数据集很大怎么办......这是一种有效的方法......还是我们可以在连接本身中做到这一点? @Shankar 对于这个问题,我相信某种类型的join 始终是必要的,因此它不会比这更有效。但是,由于tempDf 的大小始终等于或小于df1,因此我更改了它们在连接中的位置并为tempDf 添加了broadcast。这将向 Spark 暗示广播数据帧可能是一个好主意,这将在大多数情况下提高效率。 谢谢你,我的问题是我们可以在 join 本身中添加条件,而不是先加入,而不是先加入,我完全同意你的解决方案有效,我喜欢它,只是想了解在 join 本身中,我们可以创建一个 where 条件并实现相同的效果。 @Shankar 啊,我明白了。我不认为你可以用 where 解决这个问题,因为你有可变数量的列。无论如何,Spark 都会优化执行计划,所以我相信结果会差不多。【参考方案2】:

如果您从 SQL 查询中执行此操作,我会使用 Changing a SQL column title via query 之类的内容重新映射 SQL 查询本身中的列名。您可以在查询中进行简单的文本替换,以将它们规范化为 df1 或 df2 列名。

一旦你有了,你可以使用类似的东西来区分 How to obtain the difference between two DataFrames?

如果您需要更多不会在差异中使用的列(例如年龄),您可以根据差异结果再次重新选择数据。这可能不是最佳的做法,但它可能会奏效。

【讨论】:

以上是关于Spark SQL Dataframe API - 动态构建过滤条件的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL Dataframe API - 动态构建过滤条件

APACHE SPARK 2.0 API IMPROVEMENTS: RDD, DATAFRAME, DATASET AND SQL

Spark-SQL之DataFrame操作大全

java的怎么操作spark的dataframe

DataFrame编程模型初谈与Spark SQL

Spark-SQL之DataFrame操作