如何仅选择至少一个值与前一行发生更改的行

Posted

技术标签:

【中文标题】如何仅选择至少一个值与前一行发生更改的行【英文标题】:How to select only rows where at least one value has changed from the previous row 【发布时间】:2020-11-03 12:54:17 【问题描述】:

如果我有一个每天将值跟踪为快照的表,我如何才能只选择每个 ID 至少更改一个值的行,当然不包括日期。每个 ID 的第一行也应始终包含在结果中。

例如,用这个表:

id    phone_number    email          date
1     12345           a@gmail.com    2020-01-01
1     12345           a@gmail.com    2020-01-02
1     23456           a@gmail.com    2020-01-03
1     34567           a@gmail.com    2020-01-04
1     34567           a@gmail.com    2020-01-05
1     45678           a@gmail.com    2020-01-06
1     45678           a@gmail.com    2020-01-07
2     56789           b@gmail.com    2020-01-01
2     56789           b@gmail.com    2020-01-02
2     56789           c@gmail.com    2020-01-03
2     67890           c@gmail.com    2020-01-04
2     67890           c@gmail.com    2020-01-05
3     78901           d@gmail.com    2020-01-01
3     78901           d@gmail.com    2020-01-02
3     78901           d@gmail.com    2020-01-03

会返回这个结果:

id    phone_number    email          date
1     12345           a@gmail.com    2020-01-01
1     23456           a@gmail.com    2020-01-03
1     34567           a@gmail.com    2020-01-04
1     45678           a@gmail.com    2020-01-06
2     56789           b@gmail.com    2020-01-01
2     56789           c@gmail.com    2020-01-03
2     67890           c@gmail.com    2020-01-04
3     78901           d@gmail.com    2020-01-01

【问题讨论】:

您能否检查解决方案并帮助批准和投票,以防它对您有用.. 将不胜感激 如果解决方案对您有用,您能帮忙接受答案吗? @dsk 如果你能将我的 SQL 答案翻译成 PySpark 函数,我会接受你的答案 【参考方案1】:

IIUC,您需要使用带有 row_number() 的窗口函数并且只取第一个值-

在此处创建 DF

from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window as W
df = spark.createDataFrame([(1,12345,"a@gmail.com","2020-01-01"),
                           (1,12345,"a@gmail.com","2020-01-02"),
                           (1,23456,"a@gmail.com","2020-01-03"),
                           (1,34567,"a@gmail.com","2020-01-04"),
                           (1,34567,"a@gmail.com","2020-01-05"),
                           (1,45678,"a@gmail.com","2020-01-06"),
                           (1,45678,"a@gmail.com","2020-01-07"),
                           (2,56789,"b@gmail.com","2020-01-01"),
                           (2,56789,"b@gmail.com","2020-01-02"),
                           (2,56789,"c@gmail.com","2020-01-03"),
                           (2,67890,"c@gmail.com","2020-01-04"),
                           (2,67890,"c@gmail.com","2020-01-05"),
                           (3,78901,"d@gmail.com","2020-01-01"),
                           (3,78901,"d@gmail.com","2020-01-02"),
                           (3,78901,"d@gmail.com","2020-01-03"),
                           ],[ "id","phone_number","email","date"])
df.show() 

输出

+---+------------+-----------+----------+
| id|phone_number|      email|      date|
+---+------------+-----------+----------+
|  1|       12345|a@gmail.com|2020-01-01|
|  1|       12345|a@gmail.com|2020-01-02|
|  1|       23456|a@gmail.com|2020-01-03|
|  1|       34567|a@gmail.com|2020-01-04|
|  1|       34567|a@gmail.com|2020-01-05|
|  1|       45678|a@gmail.com|2020-01-06|
|  1|       45678|a@gmail.com|2020-01-07|
|  2|       56789|b@gmail.com|2020-01-01|
|  2|       56789|b@gmail.com|2020-01-02|
|  2|       56789|c@gmail.com|2020-01-03|
|  2|       67890|c@gmail.com|2020-01-04|
|  2|       67890|c@gmail.com|2020-01-05|
|  3|       78901|d@gmail.com|2020-01-01|
|  3|       78901|d@gmail.com|2020-01-02|
|  3|       78901|d@gmail.com|2020-01-03|
+---+------------+-----------+----------+

这里的逻辑

_w = W.partitionBy("id","phone_number","email").orderBy("date")
df = df.withColumn("rnk", F.row_number().over(_w))
df = df.filter(F.col("rnk") ==F.lit(1))
df = df.orderBy("id", "phone_number")
df.show()
+---+------------+-----------+----------+---+
| id|phone_number|      email|      date|rnk|
+---+------------+-----------+----------+---+
|  1|       12345|a@gmail.com|2020-01-01|  1|
|  1|       23456|a@gmail.com|2020-01-03|  1|
|  1|       34567|a@gmail.com|2020-01-04|  1|
|  1|       45678|a@gmail.com|2020-01-06|  1|
|  2|       56789|c@gmail.com|2020-01-03|  1|
|  2|       56789|b@gmail.com|2020-01-01|  1|
|  2|       67890|c@gmail.com|2020-01-04|  1|
|  3|       78901|d@gmail.com|2020-01-01|  1|
+---+------------+-----------+----------+---+

【讨论】:

谢谢 - 这几乎可以工作,但如果一个值变回之前已经看到的值,它会丢失行。例如,如果您将原始数据帧的第 5 行更改为 (1, 12345, "a@gmail.com", "2020-01-05"),则结果中将缺少此内容。你能想出解决办法吗?【参考方案2】:

我已经把它作为一个 SQL 查询来运行,我可以这样运行:

df.createOrReplaceTempView("df")

df = spark.sql(
    """
    SELECT  a.*
    FROM (SELECT ROW_NUMBER() OVER (PARTITION BY id ORDER BY date) AS row, id, phone_number, email, date FROM df) AS a
    LEFT JOIN (SELECT ROW_NUMBER() OVER (PARTITION BY id ORDER BY date) AS row, id, phone_number, email, date FROM df) AS b
    ON a.row = b.row + 1 AND a.id = b.id 
    WHERE a.phone_number != b.phone_number OR b.phone_number IS NULL OR a.email != b.email OR b.email IS NULL
    """
)

这很好用,现在我只想把它翻译成原生 PySpark 代码。

【讨论】:

为了在 PySpark 中转换它,我需要清楚地知道逻辑和预期的输出 - 看起来怎么样?【参考方案3】:
select distinct  phone_number,email from table

【讨论】:

我也需要包含 ID 和日期

以上是关于如何仅选择至少一个值与前一行发生更改的行的主要内容,如果未能解决你的问题,请参考以下文章

如何检查新的输入行是不是与前一行(数组)冗余

如何将 microsoft access - 报告详细信息部分布局与前一个重叠以避免多余的行?

MySQL 日期时间与前一行的比较

仅选择值更改的行

选择字段中发生更改的行并将它们连接到另一个表

如何复制多行(Oracle)