如何从 PySpark Dataframe 中删除重复项并将剩余列值更改为 null

Posted

技术标签:

【中文标题】如何从 PySpark Dataframe 中删除重复项并将剩余列值更改为 null【英文标题】:How to drop duplicates from PySpark Dataframe and change the remaining column value to null 【发布时间】:2020-01-08 14:20:45 【问题描述】:

我是 Pyspark 的新手。 我有一个 Pyspark 数据框,我想根据 id 和时间戳列删除重复项。然后我想将重复 id 的读取值替换为 null。我不想使用熊猫。请看下面:

数据框:

id       reading      timestamp
1        13015        2018-03-22 08:00:00.000        
1        14550        2018-03-22 09:00:00.000
1        14570        2018-03-22 09:00:00.000
2        15700        2018-03-22 08:00:00.000
2        16700        2018-03-22 09:00:00.000
2        18000        2018-03-22 10:00:00.000

期望的输出:

id       reading      timestamp
1        13015        2018-03-22 08:00:00.000        
1        Null         2018-03-22 09:00:00.000
2        15700        2018-03-22 08:00:00.000
2        16700        2018-03-22 09:00:00.000
2        18000        2018-03-22 10:00:00.000

我需要如何添加到这段代码中:

df.dropDuplicates(['id','timestamp'])

任何帮助将不胜感激。非常感谢

【问题讨论】:

【参考方案1】:

一种使用 Window 函数计算分区 id, timestamp 上的重复项然后根据计数更新 reading 的方法:

from pyspark.sql import Window

w = Window.partitionBy("id", "timestamp").orderBy("timestamp")

df.select(col("id"),
          when(count("*").over(w) > lit(1), lit(None)).otherwise(col("reading")).alias("reading"),
          col("timestamp")
          ) \
  .dropDuplicates(["id", "reading", "timestamp"]).show(truncate=False)

或者使用分组方式:

df.groupBy("id", "timestamp").agg(first("reading").alias("reading"), count("*").alias("cn")) \
  .withColumn("reading", when(col("cn") > lit(1), lit(None)).otherwise(col("reading"))) \
  .select(*df.columns) \
  .show(truncate=False)

给予:

+---+-------+-----------------------+
|id |reading|timestamp              |
+---+-------+-----------------------+
|1  |null   |2018-03-22 09:00:00.000|
|1  |13015  |2018-03-22 08:00:00.000|
|2  |18000  |2018-03-22 10:00:00.000|
|2  |15700  |2018-03-22 08:00:00.000|
|2  |16700  |2018-03-22 09:00:00.000|
+---+-------+-----------------------+

【讨论】:

【参考方案2】:

在 Scala 上可以通过分组来完成,并且在 count 大于 1 时将“读取”值替换为 null:

val df = Seq(
  (1, 13015, "2018-03-22 08:00:00.000"),
  (1, 14550, "2018-03-22 09:00:00.000"),
  (1, 14570, "2018-03-22 09:00:00.000"),
  (2, 15700, "2018-03-22 08:00:00.000"),
  (2, 16700, "2018-03-22 09:00:00.000"),
  (2, 18000, "2018-03-22 10:00:00.000")
).toDF("id", "reading", "timestamp")

// action
df
  .groupBy("id", "timestamp")
  .agg(
    min("reading").alias("reading"),
    count("reading").alias("readingCount")
  )
  .withColumn("reading", when($"readingCount" > 1, null).otherwise($"reading"))
  .drop("readingCount")

输出是:

+---+-----------------------+-------+
|id |timestamp              |reading|
+---+-----------------------+-------+
|2  |2018-03-22 09:00:00.000|16700  |
|1  |2018-03-22 08:00:00.000|13015  |
|1  |2018-03-22 09:00:00.000|null   |
|2  |2018-03-22 10:00:00.000|18000  |
|2  |2018-03-22 08:00:00.000|15700  |
+---+-----------------------+-------+

猜猜,可以很容易地转换成 Python。

【讨论】:

以上是关于如何从 PySpark Dataframe 中删除重复项并将剩余列值更改为 null的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:如何删除 DataFrame 中的非数字列?

如何从 pyspark 的结构字段中删除 NULL?

如何从 PySpark DataFrame 中获取随机行?

如何在 PySpark 1.6 中将 DataFrame 列从字符串转换为浮点/双精度?

使用 PySpark 删除 Dataframe 的嵌套列

如何从 Pyspark Dataframe 中的字符串列中过滤字母值?