在另一列上查找最近的时间戳并在新列中添加值 PySpark

Posted

技术标签:

【中文标题】在另一列上查找最近的时间戳并在新列中添加值 PySpark【英文标题】:Find nearest Timestamp on another column and add value in a new column PySpark 【发布时间】:2021-11-22 19:19:25 【问题描述】:

我有一个包含两个 TimeStamp 列的合并 DataFrame。我想找到最近的(前向)时间戳(Timestamp1 -> Timestamp2)并获取关联的值并将其添加到新列中。

TimeStamp1                Value1     TimeStamp2               Value2
2021-11-01T01:55:29.473   131        2021-11-01T01:55:28.205  A
2021-11-01T01:55:30.474   3          2021-11-01T01:55:31.205  B
2021-11-01T05:01:55.247   195        2021-11-01T03:44:14.208  C
2021-11-01T05:01:56.247   67         2021-11-01T05:41:56.205  D
2021-11-01T09:41:30.264   131        2021-11-01T09:41:29.405  E
2021-11-01T09:41:32.264   67         2021-11-01T09:41:35.205  F

预期输出:

TimeStamp1                Value1     Value 2 
2021-11-01T01:55:29.473   131        B
2021-11-01T01:55:30.474   3          B
2021-11-01T05:01:55.247   195        D
2021-11-01T05:01:56.247   67         D
2021-11-01T09:41:30.264   131        F
2021-11-01T09:41:32.264   67         F

我正在使用 PySpark,我检查了一些方法,但在 pandas 中。

【问题讨论】:

【参考方案1】:

您正在寻找的转换可以通过两个步骤实现:

    使用self join 生成所有可能的组合,其中df["TimeStamp2"] >= df[TimeStamp1"]。这形成了我们的candidate_df。 我们修剪candidate_df 以通过在包含TimeStamp1 的行中找到包含最小TimeStamp2 的行来检索预期的行。我们这样做是按TimeStamp1candidate_df 进行分区,然后按TimeStamp2 升序排序并返回第一行。

如果您对TimeStamp1nearest TimeStamp2 之间的“最大接近度”(即)最大差异有阈值,则可以优化解决方案以减小candidate_df 的大小。

工作示例


from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window

data = [(datetime.strptime("2021-11-01T01:55:29.473", "%Y-%m-%dT%H:%M:%S.%f"), 131, datetime.strptime("2021-11-01T01:55:28.205", "%Y-%m-%dT%H:%M:%S.%f"), "A"),
(datetime.strptime("2021-11-01T01:55:30.474", "%Y-%m-%dT%H:%M:%S.%f"), 3,   datetime.strptime("2021-11-01T01:55:31.205", "%Y-%m-%dT%H:%M:%S.%f"), "B"),
(datetime.strptime("2021-11-01T05:01:55.247", "%Y-%m-%dT%H:%M:%S.%f"), 195, datetime.strptime("2021-11-01T03:44:14.208", "%Y-%m-%dT%H:%M:%S.%f"), "C"),
(datetime.strptime("2021-11-01T05:01:56.247", "%Y-%m-%dT%H:%M:%S.%f"), 67,  datetime.strptime("2021-11-01T05:41:56.205", "%Y-%m-%dT%H:%M:%S.%f"), "D"),
(datetime.strptime("2021-11-01T09:41:30.264", "%Y-%m-%dT%H:%M:%S.%f"), 131, datetime.strptime("2021-11-01T09:41:29.405", "%Y-%m-%dT%H:%M:%S.%f"), "E"),
(datetime.strptime("2021-11-01T09:41:32.264", "%Y-%m-%dT%H:%M:%S.%f"), 67,  datetime.strptime("2021-11-01T09:41:35.205", "%Y-%m-%dT%H:%M:%S.%f"), "F"),]

df = spark.createDataFrame(data, ("TimeStamp1", "Value1", "TimeStamp2", "Value2",))

candidate_df = df.alias("l").join(df.alias("r"), F.col("r.TimeStamp2") >= F.col("l.TimeStamp1"))\
                 .selectExpr("l.TimeStamp1 as TimeStamp1", 
                             "l.Value1 as Value1", 
                             "r.TimeStamp2 as TimeStamp2", 
                             "r.Value2 as Value2")

window_spec = Window.partitionBy("TimeStamp1").orderBy("TimeStamp2")

candidate_df.withColumn("rn" ,F.row_number().over(window_spec))\
            .filter(F.col("rn") == 1)\
            .drop("rn", "TimeStamp2")\
            .show(200, False)

输出

+-----------------------+------+------+
|TimeStamp1             |Value1|Value2|
+-----------------------+------+------+
|2021-11-01 01:55:29.473|131   |B     |
|2021-11-01 01:55:30.474|3     |B     |
|2021-11-01 05:01:55.247|195   |D     |
|2021-11-01 05:01:56.247|67    |D     |
|2021-11-01 09:41:30.264|131   |F     |
|2021-11-01 09:41:32.264|67    |F     |
+-----------------------+------+------+

【讨论】:

10/10,非常感谢!

以上是关于在另一列上查找最近的时间戳并在新列中添加值 PySpark的主要内容,如果未能解决你的问题,请参考以下文章

pandas:查找部分字符串并在新列中使用它

在 Pandas 数据框中找到最小值并在新列上添加标签

检查panda数据帧中的多个列是否重合并在新列中标记它们

从文件名中提取时间戳并使用 Pig 将其添加到新列(例如日期)中

从不同表的另一列中的一列中查找所有值

Pandas:根据字符串的一部分是不是在另一列中的任何位置创建新列