如何在 Spark Scala 的 UDF 中将列作为值传递以检查条件
Posted
技术标签:
【中文标题】如何在 Spark Scala 的 UDF 中将列作为值传递以检查条件【英文标题】:How to pass columns as value in UDF in Spark Scala for checking condition 【发布时间】:2018-05-14 06:46:14 【问题描述】:这是我的数据框
uniqueFundamentalSet PeriodId SourceId StatementTypeCode StatementCurrencyId UpdateReason_updateReasonId UpdateReasonComment UpdateReasonComment_languageId UpdateReasonEnumerationId FFAction|!| DataPartition PartitionYear TimeStamp
192730230775 297 182 INC 500186 null null null null O|!| Japan 2017 2018-05-10T10:11:15+00:00
192730230775 297 180 INC 500186 6 InsertUpdateReason 505074 3019685 I|!| Japan 2017 2018-05-10T10:00:40+00:00
192730230775 297 181 INC 500186 1 UpdateReason2Update 505074 3019680 I|!| Japan 2017 2018-05-10T10:00:40+00:00
192730230775 297 182 INC 500186 6 UpdateReasonToDelete 505074 3019685 I|!| Japan 2017 2018-05-10T10:00:40+00:00
192730230775 297 181 INC 500186 1 UpdateReason2UpdateIsNowUPdated 505074 3019680 I|!| Japan 2017 2018-05-10T10:08:01+00:00
192730230775 297 181 INC 500186 4 New Reason Added 505074 3019683 I|!| Japan 2017 2018-05-10T10:08:01+00:00
192730230775 297 180 INC 500186 6 InsertUpdateReason 505074 3019685 I|!| Japan 2017 2018-05-10T09:57:29+00:00
192730230775 297 181 INC 500186 1 UpdateReason2Update 505074 3019680 I|!| Japan 2017 2018-05-10T09:57:29+00:00
192730230775 297 182 INC 500186 6 UpdateReasonToDelete 505074 3019685 I|!| Japan 2017 2018-05-10T09:57:29+00:00
192730230775 308 180 BAL 500186 1 RevisedReasonAdded 505074 3019680 O|!| Japan 2017 2018-05-10T10:21:50+00:00
192730230775 308 180 BAL 500186 6 UpdateReasonToUpdateRevisedisNowUpdated 505074 3019685 O|!| Japan 2017 2018-05-10T10:21:50+00:00
192730230775 308 180 BAL 500186 1 RevisedReasonAdded 505074 3019680 O|!| Japan 2017 2018-05-10T10:27:09+00:00
192730230775 308 180 BAL 500186 6 UpdateReasonToUpdateRevisedisNowUpdated 505074 3019685 O|!| Japan 2017 2018-05-10T10:27:09+00:00
192730230775 308 179 BAL 500186 null null null null O|!| Japan 2017 2018-05-10T09:27:11+00:00
192730230775 308 181 BAL 500186 null null null null O|!| Japan 2017 2018-05-10T10:27:09+00:00
192730230775 308 180 BAL 500186 1 RevisedReasonAdded 505074 3019680 O|!| Japan 2017 2018-05-10T10:22:55+00:00
192730230775 308 180 BAL 500186 6 UpdateReasonToUpdateRevisedisNowUpdated 505074 3019685 O|!| Japan 2017 2018-05-10T10:22:55+00:00
192730230775 308 180 BAL 500186 6 UpdateReasonToUpdateRevised 505074 3019685 I|!| Japan 2017 2018-05-10T10:17:37+00:00
192730230775 308 181 BAL 500186 6 ReasonToDeleteRevised 505074 3019685 I|!| Japan 2017 2018-05-10T10:17:37+00:00
192730230775 298 180 BAL 500186 6 UpdateReasonToUpdateRevised 505074 3019685 I|!| Japan 2017 2018-05-10T10:17:37+00:00
192730230775 298 181 BAL 500186 6 ReasonToDeleteRevised 505074 3019685 I|!| Japan 2017 2018-05-10T10:17:37+00:00
192730230775 298 180 BAL 500186 1 RevisedReasonAdded 505074 3019680 I|!| Japan 2017 2018-05-10T10:22:55+00:00
192730230775 298 180 BAL 500186 6 UpdateReasonToUpdateRevisedisNowUpdated 505074 3019685 I|!| Japan 2017 2018-05-10T10:22:55+00:00
192730230775 298 180 BAL 500186 6 UpdateReasonToUpdateRevised 505074 3019685 I|!| Japan 2017 2018-05-10T10:16:31+00:00
192730230775 298 181 BAL 500186 6 ReasonToDeleteRevised 505074 3019685 I|!| Japan 2017 2018-05-10T10:16:31+00:00
192730230775 298 180 BAL 500186 1 RevisedReasonAdded 505074 3019680 I|!| Japan 2017 2018-05-10T10:21:50+00:00
192730230775 298 180 BAL 500186 6 UpdateReasonToUpdateRevisedisNowUpdated 505074 3019685 I|!| Japan 2017 2018-05-10T10:21:50+00:00
192730230775 312 181 BAL 500186 null null null null O|!| Japan 2018 2018-05-10T09:39:43+00:00
192730230775 310 181 INC 500186 null null null null D|!| Japan 9999 2018-05-10T08:21:26+00:00
192730230775 310 182 INC 500186 null null null null O|!| Japan 2018 2018-05-10T08:30:53+00:00
192730230775 298 181 BAL 500186 null null null null O|!| Japan 2017 2018-05-10T10:22:55+00:00
这是通过预期输出得到的逻辑
如果 "FFAction|!|" === “我|!|”然后按前 6 列分组,需要 根据时间戳获取最新信息。
如果如果 "FFAction|!|" ===“哦|!|”和 $"UpdateReason_updateReasonId" === “空”或“FFAction|!|” === "D|!|"然后按前 5 列分组,然后 需要根据时间戳获取最新信息。
如果一行 "FFAction|!|" === “我|!|”和另一个“FFAction|!|” === “哦|!|”在那种情况下,按前五列分组,需要获取最新的 .
与如果一行 "FFAction|!|" 相同=== “我|!|”和另一个“FFAction|!|” === "D|!|"在这种情况下,按前五列分组,需要获取最新的。
这是我的预期输出和解释逻辑。
Logic Example 1:
让我们以 PeridoId 308 为例,它总共有 11 行。 现在一行有 PeriodId 308 和 SourceId 179 并且完全不同,因此它将在 output 中。 308 和 181 有两行相同,直到第 5 列,其中一行有 O 所以我们需要按 5 列分组并取最新和最新应该是 最后 308 和 180 有 7 列相似,直到第 5 行,并且在这种情况下,它没有 UpdateReason_updateReasonId 为 null,group by 应该在 6 列上。
这样最新的将是
192730230775 308 179 BAL 500186 null null null null O|!| Japan 2017 2018-05-10T09:27:11+00:00
192730230775 308 181 BAL 500186 null null null null O|!| Japan 2017 2018-05-10T10:27:09+00:00
192730230775 308 180 BAL 500186 6 UpdateReasonToUpdateRevisedisNowUpdated 505074 3019685 O|!| Japan 2017 2018-05-10T10:22:55+00:00
192730230775 308 180 BAL 500186 1 RevisedReasonAdded 505074 3019680 O|!| Japan 2017 2018-05-10T10:27:09+00:00
所以这应该是 PeriodId 308 的最终输出。
Logic Example 2 :
类似 PeriodId 297 有 9 列。
现在它有三个组合 PeridoId 297 和 SourceId 180,181,182 所以会有三行。因为 297 和 181 有相似的 5 列,并且 SourceId 不为空,所以 group by 应该在 6 列上。 所以我们将有两个基于最新时间戳的唯一记录。 同样的方式 297 和 180 没有 SourceId 为空,因此按 6 列分组,按 Timestamp 分组。
同样,297 182 有三个相似的行,但 SourceId 为 null,因此 group by 将在 5 列上,需要获取最新的。
所以这是 297 的最终输出
192730230775 297 181 INC 500186 1 UpdateReason2Update 505074 3019680 I|!| Japan 2017 2018-05-10T10:00:40+00:00
192730230775 297 180 INC 500186 6 InsertUpdateReason 505074 3019685 I|!| Japan 2017 2018-05-10T10:00:40+00:00
192730230775 297 181 INC 500186 4 New Reason Added 505074 3019683 I|!| Japan 2017 2018-05-10T10:08:01+00:00
192730230775 297 182 INC 500186 null null null null O|!| Japan 2017 2018-05-10T10:11:15+00:00
这是我的代码,除了最后一个逻辑之外,它做同样的事情
导入 org.apache.spark.sql.expressions._ 导入 org.apache.spark.sql.functions._
val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId")
val windowSpec2 = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "group").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
def containsActionUdf = udf
(array: Seq[String]) => (array.contains("O|!|") || array.contains("D|!|"))
val latestForEachKey2 = tempReorder.withColumn("group", when(containsActionUdf(collect_list("FFAction|!|").over(windowSpec)) && ($"UpdateReason_updateReasonId" === "null") , lit("same")).otherwise($"UpdateReason_updateReasonId"))
.withColumn("rank", row_number().over(windowSpec2))
.filter($"rank" === 1).drop("rank", "group")
这是我得到的输出,它有一个额外的行。
+--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+-------------------------+
|uniqueFundamentalSet|PeriodId|SourceId|StatementTypeCode|StatementCurrencyId|UpdateReason_updateReasonId|UpdateReasonComment |UpdateReasonComment_languageId|UpdateReasonEnumerationId|FFAction|!||DataPartition|PartitionYear|TimeStamp |
+--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+-------------------------+
|192730230775 |297 |181 |INC |500186 |1 |UpdateReason2UpdateIsNowUPdated |505074 |3019680 |I|!| |Japan |2017 |2018-05-10T10:08:01+00:00|
|192730230775 |297 |181 |INC |500186 |4 |New Reason Added |505074 |3019683 |I|!| |Japan |2017 |2018-05-10T10:08:01+00:00|
|192730230775 |308 |179 |BAL |500186 |null |null |null |null |O|!| |Japan |2017 |2018-05-10T09:27:11+00:00|
|192730230775 |298 |181 |BAL |500186 |6 |ReasonToDeleteRevised |505074 |3019685 |I|!| |Japan |2017 |2018-05-10T10:17:37+00:00|
|192730230775 |298 |181 |BAL |500186 |null |null |null |null |O|!| |Japan |2017 |2018-05-10T10:22:55+00:00|
|192730230775 |297 |182 |INC |500186 |6 |UpdateReasonToDelete |505074 |3019685 |I|!| |Japan |2017 |2018-05-10T10:00:40+00:00|
|192730230775 |297 |182 |INC |500186 |null |null |null |null |O|!| |Japan |2017 |2018-05-10T10:11:15+00:00|
|192730230775 |308 |180 |BAL |500186 |1 |RevisedReasonAdded |505074 |3019680 |O|!| |Japan |2017 |2018-05-10T10:27:09+00:00|
|192730230775 |308 |180 |BAL |500186 |6 |UpdateReasonToUpdateRevisedisNowUpdated|505074 |3019685 |O|!| |Japan |2017 |2018-05-10T10:27:09+00:00|
|192730230775 |310 |181 |INC |500186 |null |null |null |null |D|!| |Japan |9999 |2018-05-10T08:21:26+00:00|
|192730230775 |308 |181 |BAL |500186 |6 |ReasonToDeleteRevised |505074 |3019685 |I|!| |Japan |2017 |2018-05-10T10:17:37+00:00|
|192730230775 |308 |181 |BAL |500186 |null |null |null |null |O|!| |Japan |2017 |2018-05-10T10:27:09+00:00|
|192730230775 |298 |180 |BAL |500186 |1 |RevisedReasonAdded |505074 |3019680 |I|!| |Japan |2017 |2018-05-10T10:22:55+00:00|
|192730230775 |298 |180 |BAL |500186 |6 |UpdateReasonToUpdateRevisedisNowUpdated|505074 |3019685 |I|!| |Japan |2017 |2018-05-10T10:22:55+00:00|
|192730230775 |312 |181 |BAL |500186 |null |null |null |null |O|!| |Japan |2018 |2018-05-10T09:39:43+00:00|
|192730230775 |310 |182 |INC |500186 |null |null |null |null |O|!| |Japan |2018 |2018-05-10T08:30:53+00:00|
|192730230775 |297 |180 |INC |500186 |6 |InsertUpdateReason |505074 |3019685 |I|!| |Japan |2017 |2018-05-10T10:00:40+00:00|
+--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+-------------------------+
就像最终输出应该是.. 最终输出..
192730230775 297 181 INC 500186 1 UpdateReason2Update 505074 3019680 I|!| Japan 2017 2018-05-10T10:00:40+00:00
192730230775 297 180 INC 500186 6 InsertUpdateReason 505074 3019685 I|!| Japan 2017 2018-05-10T10:00:40+00:00
192730230775 297 181 INC 500186 4 New Reason Added 505074 3019683 I|!| Japan 2017 2018-05-10T10:08:01+00:00
192730230775 297 182 INC 500186 null null null null O|!| Japan 2017 2018-05-10T10:11:15+00:00
192730230775 308 179 BAL 500186 null null null null O|!| Japan 2017 2018-05-10T09:27:11+00:00
192730230775 308 181 BAL 500186 null null null null O|!| Japan 2017 2018-05-10T10:27:09+00:00
192730230775 308 180 BAL 500186 6 UpdateReasonToUpdateRevisedisNowUpdated 505074 3019685 O|!| Japan 2017 2018-05-10T10:22:55+00:00
192730230775 308 180 BAL 500186 1 RevisedReasonAdded 505074 3019680 O|!| Japan 2017 2018-05-10T10:27:09+00:00
192730230775 298 180 BAL 500186 6 UpdateReasonToUpdateRevised 505074 3019685 I|!| Japan 2017 2018-05-10T10:16:31+00:00
192730230775 298 180 BAL 500186 1 RevisedReasonAdded 505074 3019680 I|!| Japan 2017 2018-05-10T10:22:55+00:00
192730230775 298 181 BAL 500186 null null null null O|!| Japan 2017 2018-05-10T10:22:55+00:00
192730230775 312 181 BAL 500186 null null null null O|!| Japan 2018 2018-05-10T09:39:43+00:00
192730230775 310 181 INC 500186 null null null null D|!| Japan 9999 2018-05-10T08:21:26+00:00
192730230775 310 182 INC 500186 null null null null O|!| Japan 2018 2018-05-10T08:30:53+00:00
【问题讨论】:
你能解释一下哪一行是多余的吗? @RameshMaharjan 以免以 PeriodId 308 和 SourceId 180 为例。它总共有 7 条记录,其中我的输出应该只有两条,对于 PeriodId 297 和 SourceId 182,我也得到了最新的一条。 @RameshMaharjan 添加&& $"UpdateReason_updateReasonId" === "null"
后解决方案不起作用
@RameshMaharjan 我已经用两个例子解释了逻辑......我希望它应该有助于更好地理解......谢谢
是否必须检查 sourceID 为 null 或 UpdateReason_updateReasonId 为 null ?在逻辑部分中您提到了 UpdateReason_updateReasonId,在逻辑示例 2 中您提到了 sourceId。请解释清楚
【参考方案1】:
在理解了你的逻辑之后,好像你在udf
函数中检查了错误的列。您应该检查UpdateReason_updateReasonId
的空值,如下所示
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
//window for checking if O|!| is present in the group
val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId")
//window for filtering out the latest after applying the group defined in previous window
val windowSpec2 = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "group").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
//udf to check if the group has O|!| or not
def containsUdf = udf(array: Seq[String])=> array.contains("null") || array.contains("NULL") || array.contains(null)
//applying the window and udf functions and filtering in the latest
val latestForEachKey1 = tempReorder.withColumn("group", when(containsUdf(collect_list("UpdateReason_updateReasonId").over(windowSpec)), lit("same")).otherwise($"UpdateReason_updateReasonId"))
.withColumn("rank", row_number().over(windowSpec2))
.filter($"rank" === 1).drop("rank", "group")
latestForEachKey1.show(false)
这应该给你
+--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+--------------------------+
|uniqueFundamentalSet|PeriodId|SourceId|StatementTypeCode|StatementCurrencyId|UpdateReason_updateReasonId|UpdateReasonComment |UpdateReasonComment_languageId|UpdateReasonEnumerationId|FFAction|!||DataPartition|PartitionYear|TimeStamp |
+--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+--------------------------+
|192730230775 |297 |181 |INC |500186 |1 |UpdateReason2UpdateIsNowUPdated |505074 |3019680 |I|!| |Japan |2017 |2018-05-10T10:08:01+00:00 |
|192730230775 |297 |181 |INC |500186 |4 |New Reason Added |505074 |3019683 |I|!| |Japan |2017 |2018-05-10T10:08:01+00:00 |
|192730230775 |308 |179 |BAL |500186 |null |null |null |null |O|!| |Japan |2017 |2018-05-10T09:27:11+00:00 |
|192730230775 |298 |181 |BAL |500186 |null |null |null |null |O|!| |Japan |2017 |2018-05-10T10:22:55+00:00 |
|192730230775 |297 |182 |INC |500186 |null |null |null |null |O|!| |Japan |2017 |2018-05-10T10:11:15+00:00 |
|192730230775 |308 |180 |BAL |500186 |1 |RevisedReasonAdded |505074 |3019680 |O|!| |Japan |2017 |2018-05-10T10:27:09+00:00 |
|192730230775 |308 |180 |BAL |500186 |6 |UpdateReasonToUpdateRevisedisNowUpdated|505074 |3019685 |O|!| |Japan |2017 |2018-05-10T10:27:09+00:000|
|192730230775 |310 |181 |INC |500186 |null |null |null |null |D|!| |Japan |9999 |2018-05-10T08:21:26+00:00 |
|192730230775 |308 |181 |BAL |500186 |null |null |null |null |O|!| |Japan |2017 |2018-05-10T10:27:09+00:00 |
|192730230775 |298 |180 |BAL |500186 |1 |RevisedReasonAdded |505074 |3019680 |I|!| |Japan |2017 |2018-05-10T10:22:55+00:00 |
|192730230775 |298 |180 |BAL |500186 |6 |UpdateReasonToUpdateRevisedisNowUpdated|505074 |3019685 |I|!| |Japan |2017 |2018-05-10T10:21:50+00:000|
|192730230775 |312 |181 |BAL |500186 |null |null |null |null |O|!| |Japan |2018 |2018-05-10T09:39:43+00:00 |
|192730230775 |310 |182 |INC |500186 |null |null |null |null |O|!| |Japan |2018 |2018-05-10T08:30:53+00:00 |
|192730230775 |297 |180 |INC |500186 |6 |InsertUpdateReason |505074 |3019685 |I|!| |Japan |2017 |2018-05-10T10:00:40+00:00 |
+--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+--------------------------+
我猜这是预期的结果。希望回答对你有帮助
【讨论】:
以上是关于如何在 Spark Scala 的 UDF 中将列作为值传递以检查条件的主要内容,如果未能解决你的问题,请参考以下文章
使用 UDF 及其性能的 Spark Scala 数据集验证
如何在scala spark中将数据框的特定列与另一个列连接[重复]