Spark SQL Dataframe API - 动态构建过滤条件
Posted
技术标签:
【中文标题】Spark SQL Dataframe API - 动态构建过滤条件【英文标题】:Spark SQL Dataframe API -build filter condition dynamically 【发布时间】:2017-12-03 11:45:27 【问题描述】:我有两个 Spark 数据框,df1
和 df2
:
+-------+-----+---+
| name|empNo|age|
+-------+-----+---+
|shankar|12121| 28|
| ramesh| 1212| 29|
| suresh| 1111| 30|
| aarush| 0707| 15|
+-------+-----+---+
+------+-----+---+-----+
| eName| eNo|age| city|
+------+-----+---+-----+
|aarush|12121| 15|malmo|
|ramesh| 1212| 29|malmo|
+------+-----+---+-----+
我需要根据另一个文件中指定的列数从df1
获取不匹配记录。
例如,列查找文件如下所示:
df1col,df2col
name,eName
empNo, eNo
预期输出为:
+-------+-----+---+
| name|empNo|age|
+-------+-----+---+
|shankar|12121| 28|
| suresh| 1111| 30|
| aarush| 0707| 15|
+-------+-----+---+
想法是如何为上述场景动态构建 where 条件,因为查找文件是可配置的,所以它可能有 1 到 n 个字段。
【问题讨论】:
【参考方案1】:您可以使用except
数据框方法。为简单起见,我假设要使用的列位于两个列表中。两个列表的顺序必须正确,列表中相同位置的列将被比较(不管列名)。在except
之后,使用join
从第一个数据帧中获取缺失的列。
val df1 = Seq(("shankar","12121",28),("ramesh","1212",29),("suresh","1111",30),("aarush","0707",15))
.toDF("name", "empNo", "age")
val df2 = Seq(("aarush", "12121", 15, "malmo"),("ramesh", "1212", 29, "malmo"))
.toDF("eName", "eNo", "age", "city")
val df1Cols = List("name", "empNo")
val df2Cols = List("eName", "eNo")
val tempDf = df1.select(df1Cols.head, df1Cols.tail: _*)
.except(df2.select(df2Cols.head, df2Cols.tail: _*))
val df = df1.join(broadcast(tempDf), df1Cols)
生成的数据框将看起来像想要的那样:
+-------+-----+---+
| name|empNo|age|
+-------+-----+---+
| aarush| 0707| 15|
| suresh| 1111| 30|
|shankar|12121| 28|
+-------+-----+---+
【讨论】:
如果数据集很大怎么办......这是一种有效的方法......还是我们可以在连接本身中做到这一点? @Shankar 对于这个问题,我相信某种类型的join
始终是必要的,因此它不会比这更有效。但是,由于tempDf
的大小始终等于或小于df1
,因此我更改了它们在连接中的位置并为tempDf
添加了broadcast
。这将向 Spark 暗示广播数据帧可能是一个好主意,这将在大多数情况下提高效率。
谢谢你,我的问题是我们可以在 join 本身中添加条件,而不是先加入,而不是先加入,我完全同意你的解决方案有效,我喜欢它,只是想了解在 join 本身中,我们可以创建一个 where 条件并实现相同的效果。
@Shankar 啊,我明白了。我不认为你可以用 where 解决这个问题,因为你有可变数量的列。无论如何,Spark 都会优化执行计划,所以我相信结果会差不多。【参考方案2】:
如果您从 SQL 查询中执行此操作,我会使用 Changing a SQL column title via query 之类的内容重新映射 SQL 查询本身中的列名。您可以在查询中进行简单的文本替换,以将它们规范化为 df1 或 df2 列名。
一旦你有了,你可以使用类似的东西来区分 How to obtain the difference between two DataFrames?
如果您需要更多不会在差异中使用的列(例如年龄),您可以根据差异结果再次重新选择数据。这可能不是最佳的做法,但它可能会奏效。
【讨论】:
以上是关于Spark SQL Dataframe API - 动态构建过滤条件的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL Dataframe API - 动态构建过滤条件
APACHE SPARK 2.0 API IMPROVEMENTS: RDD, DATAFRAME, DATASET AND SQL