根据最大 Spark Scala 替换列值

Posted

技术标签:

【中文标题】根据最大 Spark Scala 替换列值【英文标题】:Replace column values based on the max Spark Scala 【发布时间】:2021-11-16 22:47:03 【问题描述】:

假设我有一个如下所示的数据集:

val data1 = Seq(
  ("81518165", "10", "0412221432", "2021.02.01 12:29:57"),
  ("81518165", "10", "0412392873", "2021.02.01 11:33:41"),
  ("81518165", "10", "0412392879", "2021.02.01 05:12:12"),
  ("81518165", "10", "0412392950", "2021.02.01 01:39:37"),
  ("23698102", "12", "0412221432", "2021.02.14 12:55:33"),
  ("23698102", "12", "0412392873", "2021.02.14 11:33:37"),
  ("23698102", "12", "0412392879", "2021.02.14 05:12:00")
)

val df1 = data1.toDF("AUFTRAG", "AUFTRAG_POS", "IID_CODE", "ERST_TIMESTAMP")

我想删除由日期引起的重复行,方法是根据最大日期“ERST_TIMESTAMP”聚合“AUFTRAG”和“AUFTRAG_POS”列。 要获得最大日期,这是我的代码:

df1.withColumn("ERST_TIMESTAMP", to_timestamp(col("ERST_TIMESTAMP"),"yyyy.MM.dd HH:mm:ss"))
  .groupBy("AUFTRAG", "AUFTRAG_POS")
  .agg(max("ERST_TIMESTAMP"))
  .show()

这是预期的结果:

+--------+-----------+-------------------+
| AUFTRAG|AUFTRAG_POS|max(ERST_TIMESTAMP)|
+--------+-----------+-------------------+
|81518165|         10|2021-02-01 12:29:57|
|23698102|         12|2021-02-14 12:55:33|
+--------+-----------+-------------------+

我现在的目标是在这个最大日期之前替换按“AUFTRAG”和“AUFTRAG_POS”分组的 ERST_TIMESTAMP。这是我的解决方案:

val df2 = df1.withColumn("ERST_TIMESTAMP", to_timestamp(col("ERST_TIMESTAMP"),"yyyy.MM.dd HH:mm:ss"))
  .groupBy("AUFTRAG", "AUFTRAG_POS")
  .agg(max("ERST_TIMESTAMP"))


df1.join(df2, Seq("AUFTRAG", "AUFTRAG_POS")).show()

预期的结果,正是我想要的:

我对这种方法不是很满意。还有其他方法吗?有什么帮助吗?

【问题讨论】:

【参考方案1】:

您可以使用Window 函数,如下所示

import org.apache.spark.sql.functions._
val window = Window.partitionBy("AUFTRAG", "AUFTRAG_POS")

df1.withColumn("ERST_TIMESTAMP", to_timestamp(col("ERST_TIMESTAMP"),"yyyy.MM.dd HH:mm:ss"))
  .withColumn("ERST_TIMESTAMP", max("ERST_TIMESTAMP").over(window))
  .show(false)

输出:

+--------+-----------+----------+-------------------+
|AUFTRAG |AUFTRAG_POS|IID_CODE  |ERST_TIMESTAMP     |
+--------+-----------+----------+-------------------+
|81518165|10         |0412221432|2021-02-01 12:29:57|
|81518165|10         |0412392873|2021-02-01 12:29:57|
|81518165|10         |0412392879|2021-02-01 12:29:57|
|81518165|10         |0412392950|2021-02-01 12:29:57|
+--------+-----------+----------+-------------------+

【讨论】:

以上是关于根据最大 Spark Scala 替换列值的主要内容,如果未能解决你的问题,请参考以下文章

从集合中随机替换 spark 数据集列值

为啥 Spark Dataset.select 替换列值

使用 Map 替换 Spark 中的列值

如何使用scala在Apache spark中用空字符串(“”)替换空值[重复]

在简单的正则表达式替换中获取 NPE(Spark 上的 Scala)

根据 NaN 将列值替换为 0 或 1 [重复]