如何在 Spark Scala 的 UDF 中将列作为值传递以检查条件

Posted

技术标签:

【中文标题】如何在 Spark Scala 的 UDF 中将列作为值传递以检查条件【英文标题】:How to pass columns as value in UDF in Spark Scala for checking condition 【发布时间】:2018-05-14 06:46:14 【问题描述】:

这是我的数据框

uniqueFundamentalSet    PeriodId    SourceId    StatementTypeCode   StatementCurrencyId UpdateReason_updateReasonId UpdateReasonComment UpdateReasonComment_languageId  UpdateReasonEnumerationId   FFAction|!| DataPartition   PartitionYear   TimeStamp
192730230775    297 182 INC 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:11:15+00:00
192730230775    297 180 INC 500186  6   InsertUpdateReason  505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 181 INC 500186  1   UpdateReason2Update 505074  3019680 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 181 INC 500186  1   UpdateReason2UpdateIsNowUPdated 505074  3019680 I|!|    Japan   2017    2018-05-10T10:08:01+00:00
192730230775    297 181 INC 500186  4   New Reason Added    505074  3019683 I|!|    Japan   2017    2018-05-10T10:08:01+00:00
192730230775    297 180 INC 500186  6   InsertUpdateReason  505074  3019685 I|!|    Japan   2017    2018-05-10T09:57:29+00:00
192730230775    297 181 INC 500186  1   UpdateReason2Update 505074  3019680 I|!|    Japan   2017    2018-05-10T09:57:29+00:00
192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T09:57:29+00:00
192730230775    308 180 BAL 500186  1   RevisedReasonAdded  505074  3019680 O|!|    Japan   2017    2018-05-10T10:21:50+00:00
192730230775    308 180 BAL 500186  6   UpdateReasonToUpdateRevisedisNowUpdated 505074  3019685 O|!|    Japan   2017    2018-05-10T10:21:50+00:00
192730230775    308 180 BAL 500186  1   RevisedReasonAdded  505074  3019680 O|!|    Japan   2017    2018-05-10T10:27:09+00:00
192730230775    308 180 BAL 500186  6   UpdateReasonToUpdateRevisedisNowUpdated 505074  3019685 O|!|    Japan   2017    2018-05-10T10:27:09+00:00
192730230775    308 179 BAL 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T09:27:11+00:00
192730230775    308 181 BAL 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:27:09+00:00
192730230775    308 180 BAL 500186  1   RevisedReasonAdded  505074  3019680 O|!|    Japan   2017    2018-05-10T10:22:55+00:00
192730230775    308 180 BAL 500186  6   UpdateReasonToUpdateRevisedisNowUpdated 505074  3019685 O|!|    Japan   2017    2018-05-10T10:22:55+00:00
192730230775    308 180 BAL 500186  6   UpdateReasonToUpdateRevised 505074  3019685 I|!|    Japan   2017    2018-05-10T10:17:37+00:00
192730230775    308 181 BAL 500186  6   ReasonToDeleteRevised   505074  3019685 I|!|    Japan   2017    2018-05-10T10:17:37+00:00
192730230775    298 180 BAL 500186  6   UpdateReasonToUpdateRevised 505074  3019685 I|!|    Japan   2017    2018-05-10T10:17:37+00:00
192730230775    298 181 BAL 500186  6   ReasonToDeleteRevised   505074  3019685 I|!|    Japan   2017    2018-05-10T10:17:37+00:00
192730230775    298 180 BAL 500186  1   RevisedReasonAdded  505074  3019680 I|!|    Japan   2017    2018-05-10T10:22:55+00:00
192730230775    298 180 BAL 500186  6   UpdateReasonToUpdateRevisedisNowUpdated 505074  3019685 I|!|    Japan   2017    2018-05-10T10:22:55+00:00
192730230775    298 180 BAL 500186  6   UpdateReasonToUpdateRevised 505074  3019685 I|!|    Japan   2017    2018-05-10T10:16:31+00:00
192730230775    298 181 BAL 500186  6   ReasonToDeleteRevised   505074  3019685 I|!|    Japan   2017    2018-05-10T10:16:31+00:00
192730230775    298 180 BAL 500186  1   RevisedReasonAdded  505074  3019680 I|!|    Japan   2017    2018-05-10T10:21:50+00:00
192730230775    298 180 BAL 500186  6   UpdateReasonToUpdateRevisedisNowUpdated 505074  3019685 I|!|    Japan   2017    2018-05-10T10:21:50+00:00
192730230775    312 181 BAL 500186  null    null    null    null    O|!|    Japan   2018    2018-05-10T09:39:43+00:00
192730230775    310 181 INC 500186  null    null    null    null    D|!|    Japan   9999    2018-05-10T08:21:26+00:00
192730230775    310 182 INC 500186  null    null    null    null    O|!|    Japan   2018    2018-05-10T08:30:53+00:00
192730230775    298 181 BAL 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:22:55+00:00

这是通过预期输出得到的逻辑

如果 "FFAction|!|" === “我|!|”然后按前 6 列分组,需要 根据时间戳获取最新信息。

如果如果 "FFAction|!|" ===“哦|!|”和 $"UpdateReason_updateReasonId" === “空”或“FFAction|!|” === "D|!|"然后按前 5 列分组,然后 需要根据时间戳获取最新信息。

如果一行 "FFAction|!|" === “我|!|”和另一个“FFAction|!|” === “哦|!|”在那种情况下,按前五列分组,需要获取最新的 .

与如果一行 "FFAction|!|" 相同=== “我|!|”和另一个“FFAction|!|” === "D|!|"在这种情况下,按前五列分组,需要获取最新的。

这是我的预期输出和解释逻辑。

Logic Example 1:

让我们以 PeridoId 308 为例,它总共有 11 行。 现在一行有 PeriodId 308 和 SourceId 179 并且完全不同,因此它将在 output 中。 308 和 181 有两行相同,直到第 5 列,其中一行有 O 所以我们需要按 5 列分组并取最新和最新应该是 最后 308 和 180 有 7 列相似,直到第 5 行,并且在这种情况下,它没有 UpdateReason_updateReasonId 为 null,group by 应该在 6 列上。

这样最新的将是

192730230775    308 179 BAL 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T09:27:11+00:00
192730230775    308 181 BAL 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:27:09+00:00
192730230775    308 180 BAL 500186  6   UpdateReasonToUpdateRevisedisNowUpdated 505074  3019685 O|!|    Japan   2017    2018-05-10T10:22:55+00:00
192730230775    308 180 BAL 500186  1   RevisedReasonAdded  505074  3019680 O|!|    Japan   2017    2018-05-10T10:27:09+00:00

所以这应该是 PeriodId 308 的最终输出。

Logic Example 2 :

类似 PeriodId 297 有 9 列。

现在它有三个组合 PeridoId 297 和 SourceId 180,181,182 所以会有三行。因为 297 和 181 有相似的 5 列,并且 SourceId 不为空,所以 group by 应该在 6 列上。 所以我们将有两个基于最新时间戳的唯一记录。 同样的方式 297 和 180 没有 SourceId 为空,因此按 6 列分组,按 Timestamp 分组。

同样,297 182 有三个相似的行,但 SourceId 为 null,因此 group by 将在 5 列上,需要获取最新的。

所以这是 297 的最终输出

192730230775    297 181 INC 500186  1   UpdateReason2Update 505074  3019680 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 180 INC 500186  6   InsertUpdateReason  505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 181 INC 500186  4   New Reason Added    505074  3019683 I|!|    Japan   2017    2018-05-10T10:08:01+00:00
192730230775    297 182 INC 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:11:15+00:00

这是我的代码,除了最后一个逻辑之外,它做同样的事情

导入 org.apache.spark.sql.expressions._ 导入 org.apache.spark.sql.functions._

val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId")

val windowSpec2 = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "group").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)


def containsActionUdf = udf 

  (array: Seq[String]) => (array.contains("O|!|") || array.contains("D|!|"))


val latestForEachKey2 = tempReorder.withColumn("group", when(containsActionUdf(collect_list("FFAction|!|").over(windowSpec)) && ($"UpdateReason_updateReasonId" === "null") , lit("same")).otherwise($"UpdateReason_updateReasonId"))
  .withColumn("rank", row_number().over(windowSpec2))
  .filter($"rank" === 1).drop("rank", "group")

这是我得到的输出,它有一个额外的行。

        +--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+-------------------------+
|uniqueFundamentalSet|PeriodId|SourceId|StatementTypeCode|StatementCurrencyId|UpdateReason_updateReasonId|UpdateReasonComment                    |UpdateReasonComment_languageId|UpdateReasonEnumerationId|FFAction|!||DataPartition|PartitionYear|TimeStamp                |
+--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+-------------------------+
|192730230775        |297     |181     |INC              |500186             |1                          |UpdateReason2UpdateIsNowUPdated        |505074                        |3019680                  |I|!|       |Japan        |2017         |2018-05-10T10:08:01+00:00|
|192730230775        |297     |181     |INC              |500186             |4                          |New Reason Added                       |505074                        |3019683                  |I|!|       |Japan        |2017         |2018-05-10T10:08:01+00:00|
|192730230775        |308     |179     |BAL              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2017         |2018-05-10T09:27:11+00:00|
|192730230775        |298     |181     |BAL              |500186             |6                          |ReasonToDeleteRevised                  |505074                        |3019685                  |I|!|       |Japan        |2017         |2018-05-10T10:17:37+00:00|
|192730230775        |298     |181     |BAL              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2017         |2018-05-10T10:22:55+00:00|
|192730230775        |297     |182     |INC              |500186             |6                          |UpdateReasonToDelete                   |505074                        |3019685                  |I|!|       |Japan        |2017         |2018-05-10T10:00:40+00:00|
|192730230775        |297     |182     |INC              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2017         |2018-05-10T10:11:15+00:00|
|192730230775        |308     |180     |BAL              |500186             |1                          |RevisedReasonAdded                     |505074                        |3019680                  |O|!|       |Japan        |2017         |2018-05-10T10:27:09+00:00|
|192730230775        |308     |180     |BAL              |500186             |6                          |UpdateReasonToUpdateRevisedisNowUpdated|505074                        |3019685                  |O|!|       |Japan        |2017         |2018-05-10T10:27:09+00:00|
|192730230775        |310     |181     |INC              |500186             |null                       |null                                   |null                          |null                     |D|!|       |Japan        |9999         |2018-05-10T08:21:26+00:00|
|192730230775        |308     |181     |BAL              |500186             |6                          |ReasonToDeleteRevised                  |505074                        |3019685                  |I|!|       |Japan        |2017         |2018-05-10T10:17:37+00:00|
|192730230775        |308     |181     |BAL              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2017         |2018-05-10T10:27:09+00:00|
|192730230775        |298     |180     |BAL              |500186             |1                          |RevisedReasonAdded                     |505074                        |3019680                  |I|!|       |Japan        |2017         |2018-05-10T10:22:55+00:00|
|192730230775        |298     |180     |BAL              |500186             |6                          |UpdateReasonToUpdateRevisedisNowUpdated|505074                        |3019685                  |I|!|       |Japan        |2017         |2018-05-10T10:22:55+00:00|
|192730230775        |312     |181     |BAL              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2018         |2018-05-10T09:39:43+00:00|
|192730230775        |310     |182     |INC              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2018         |2018-05-10T08:30:53+00:00|
|192730230775        |297     |180     |INC              |500186             |6                          |InsertUpdateReason                     |505074                        |3019685                  |I|!|       |Japan        |2017         |2018-05-10T10:00:40+00:00|
+--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+-------------------------+

就像最终输出应该是.. 最终输出..

192730230775    297 181 INC 500186  1   UpdateReason2Update 505074  3019680 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 180 INC 500186  6   InsertUpdateReason  505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 181 INC 500186  4   New Reason Added    505074  3019683 I|!|    Japan   2017    2018-05-10T10:08:01+00:00
192730230775    297 182 INC 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:11:15+00:00
192730230775    308 179 BAL 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T09:27:11+00:00
192730230775    308 181 BAL 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:27:09+00:00
192730230775    308 180 BAL 500186  6   UpdateReasonToUpdateRevisedisNowUpdated 505074  3019685 O|!|    Japan   2017    2018-05-10T10:22:55+00:00
192730230775    308 180 BAL 500186  1   RevisedReasonAdded  505074  3019680 O|!|    Japan   2017    2018-05-10T10:27:09+00:00
192730230775    298 180 BAL 500186  6   UpdateReasonToUpdateRevised 505074  3019685 I|!|    Japan   2017    2018-05-10T10:16:31+00:00
192730230775    298 180 BAL 500186  1   RevisedReasonAdded  505074  3019680 I|!|    Japan   2017    2018-05-10T10:22:55+00:00
192730230775    298 181 BAL 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:22:55+00:00
192730230775    312 181 BAL 500186  null    null    null    null    O|!|    Japan   2018    2018-05-10T09:39:43+00:00
192730230775    310 181 INC 500186  null    null    null    null    D|!|    Japan   9999    2018-05-10T08:21:26+00:00
192730230775    310 182 INC 500186  null    null    null    null    O|!|    Japan   2018    2018-05-10T08:30:53+00:00

【问题讨论】:

你能解释一下哪一行是多余的吗? @RameshMaharjan 以免以 PeriodId 308 和 SourceId 180 为例。它总共有 7 条记录,其中我的输出应该只有两条,对于 PeriodId 297 和 SourceId 182,我也得到了最新的一条。 @RameshMaharjan 添加&& $"UpdateReason_updateReasonId" === "null" 后解决方案不起作用 @RameshMaharjan 我已经用两个例子解释了逻辑......我希望它应该有助于更好地理解......谢谢 是否必须检查 sourceID 为 null 或 UpdateReason_updateReasonId 为 null ?在逻辑部分中您提到了 UpdateReason_updateReasonId,在逻辑示例 2 中您提到了 sourceId。请解释清楚 【参考方案1】:

在理解了你的逻辑之后,好像你在udf函数中检查了错误的列。您应该检查UpdateReason_updateReasonId 的空值,如下所示

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._

//window for checking if O|!| is present in the group
val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId")
//window for filtering out the latest after applying the group defined in previous window
val windowSpec2 = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "group").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)

//udf to check if the group has O|!| or not
def containsUdf = udf(array: Seq[String])=> array.contains("null") || array.contains("NULL") || array.contains(null)

//applying the window and udf functions and filtering in the latest
val latestForEachKey1 = tempReorder.withColumn("group", when(containsUdf(collect_list("UpdateReason_updateReasonId").over(windowSpec)), lit("same")).otherwise($"UpdateReason_updateReasonId"))
                                    .withColumn("rank", row_number().over(windowSpec2))
                                    .filter($"rank" === 1).drop("rank", "group")
latestForEachKey1.show(false)

这应该给你

+--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+--------------------------+
|uniqueFundamentalSet|PeriodId|SourceId|StatementTypeCode|StatementCurrencyId|UpdateReason_updateReasonId|UpdateReasonComment                    |UpdateReasonComment_languageId|UpdateReasonEnumerationId|FFAction|!||DataPartition|PartitionYear|TimeStamp                 |
+--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+--------------------------+
|192730230775        |297     |181     |INC              |500186             |1                          |UpdateReason2UpdateIsNowUPdated        |505074                        |3019680                  |I|!|       |Japan        |2017         |2018-05-10T10:08:01+00:00 |
|192730230775        |297     |181     |INC              |500186             |4                          |New Reason Added                       |505074                        |3019683                  |I|!|       |Japan        |2017         |2018-05-10T10:08:01+00:00 |
|192730230775        |308     |179     |BAL              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2017         |2018-05-10T09:27:11+00:00 |
|192730230775        |298     |181     |BAL              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2017         |2018-05-10T10:22:55+00:00 |
|192730230775        |297     |182     |INC              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2017         |2018-05-10T10:11:15+00:00 |
|192730230775        |308     |180     |BAL              |500186             |1                          |RevisedReasonAdded                     |505074                        |3019680                  |O|!|       |Japan        |2017         |2018-05-10T10:27:09+00:00 |
|192730230775        |308     |180     |BAL              |500186             |6                          |UpdateReasonToUpdateRevisedisNowUpdated|505074                        |3019685                  |O|!|       |Japan        |2017         |2018-05-10T10:27:09+00:000|
|192730230775        |310     |181     |INC              |500186             |null                       |null                                   |null                          |null                     |D|!|       |Japan        |9999         |2018-05-10T08:21:26+00:00 |
|192730230775        |308     |181     |BAL              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2017         |2018-05-10T10:27:09+00:00 |
|192730230775        |298     |180     |BAL              |500186             |1                          |RevisedReasonAdded                     |505074                        |3019680                  |I|!|       |Japan        |2017         |2018-05-10T10:22:55+00:00 |
|192730230775        |298     |180     |BAL              |500186             |6                          |UpdateReasonToUpdateRevisedisNowUpdated|505074                        |3019685                  |I|!|       |Japan        |2017         |2018-05-10T10:21:50+00:000|
|192730230775        |312     |181     |BAL              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2018         |2018-05-10T09:39:43+00:00 |
|192730230775        |310     |182     |INC              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2018         |2018-05-10T08:30:53+00:00 |
|192730230775        |297     |180     |INC              |500186             |6                          |InsertUpdateReason                     |505074                        |3019685                  |I|!|       |Japan        |2017         |2018-05-10T10:00:40+00:00 |
+--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+--------------------------+

我猜这是预期的结果。希望回答对你有帮助

【讨论】:

以上是关于如何在 Spark Scala 的 UDF 中将列作为值传递以检查条件的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark SQL 中将额外参数传递给 UDF?

使用 UDF 及其性能的 Spark Scala 数据集验证

Scala Spark - 不支持 udf 列

如何在scala spark中将数据框的特定列与另一个列连接[重复]

SPARK 数据框错误:在使用 UDF 拆分列中的字符串时无法转换为 scala.Function2

如何使用scala将特定函数转换为apache spark中的udf函数? [复制]