在另一列上查找最近的时间戳并在新列中添加值 PySpark
Posted
技术标签:
【中文标题】在另一列上查找最近的时间戳并在新列中添加值 PySpark【英文标题】:Find nearest Timestamp on another column and add value in a new column PySpark 【发布时间】:2021-11-22 19:19:25 【问题描述】:我有一个包含两个 TimeStamp 列的合并 DataFrame。我想找到最近的(前向)时间戳(Timestamp1 -> Timestamp2)并获取关联的值并将其添加到新列中。
TimeStamp1 Value1 TimeStamp2 Value2
2021-11-01T01:55:29.473 131 2021-11-01T01:55:28.205 A
2021-11-01T01:55:30.474 3 2021-11-01T01:55:31.205 B
2021-11-01T05:01:55.247 195 2021-11-01T03:44:14.208 C
2021-11-01T05:01:56.247 67 2021-11-01T05:41:56.205 D
2021-11-01T09:41:30.264 131 2021-11-01T09:41:29.405 E
2021-11-01T09:41:32.264 67 2021-11-01T09:41:35.205 F
预期输出:
TimeStamp1 Value1 Value 2
2021-11-01T01:55:29.473 131 B
2021-11-01T01:55:30.474 3 B
2021-11-01T05:01:55.247 195 D
2021-11-01T05:01:56.247 67 D
2021-11-01T09:41:30.264 131 F
2021-11-01T09:41:32.264 67 F
我正在使用 PySpark,我检查了一些方法,但在 pandas 中。
【问题讨论】:
【参考方案1】:您正在寻找的转换可以通过两个步骤实现:
-
使用
self join
生成所有可能的组合,其中df["TimeStamp2"] >= df[TimeStamp1"]
。这形成了我们的candidate_df
。
我们修剪candidate_df
以通过在包含TimeStamp1
的行中找到包含最小TimeStamp2
的行来检索预期的行。我们这样做是按TimeStamp1
对candidate_df
进行分区,然后按TimeStamp2
升序排序并返回第一行。
如果您对
TimeStamp1
和nearest TimeStamp2
之间的“最大接近度”(即)最大差异有阈值,则可以优化解决方案以减小candidate_df
的大小。
工作示例
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [(datetime.strptime("2021-11-01T01:55:29.473", "%Y-%m-%dT%H:%M:%S.%f"), 131, datetime.strptime("2021-11-01T01:55:28.205", "%Y-%m-%dT%H:%M:%S.%f"), "A"),
(datetime.strptime("2021-11-01T01:55:30.474", "%Y-%m-%dT%H:%M:%S.%f"), 3, datetime.strptime("2021-11-01T01:55:31.205", "%Y-%m-%dT%H:%M:%S.%f"), "B"),
(datetime.strptime("2021-11-01T05:01:55.247", "%Y-%m-%dT%H:%M:%S.%f"), 195, datetime.strptime("2021-11-01T03:44:14.208", "%Y-%m-%dT%H:%M:%S.%f"), "C"),
(datetime.strptime("2021-11-01T05:01:56.247", "%Y-%m-%dT%H:%M:%S.%f"), 67, datetime.strptime("2021-11-01T05:41:56.205", "%Y-%m-%dT%H:%M:%S.%f"), "D"),
(datetime.strptime("2021-11-01T09:41:30.264", "%Y-%m-%dT%H:%M:%S.%f"), 131, datetime.strptime("2021-11-01T09:41:29.405", "%Y-%m-%dT%H:%M:%S.%f"), "E"),
(datetime.strptime("2021-11-01T09:41:32.264", "%Y-%m-%dT%H:%M:%S.%f"), 67, datetime.strptime("2021-11-01T09:41:35.205", "%Y-%m-%dT%H:%M:%S.%f"), "F"),]
df = spark.createDataFrame(data, ("TimeStamp1", "Value1", "TimeStamp2", "Value2",))
candidate_df = df.alias("l").join(df.alias("r"), F.col("r.TimeStamp2") >= F.col("l.TimeStamp1"))\
.selectExpr("l.TimeStamp1 as TimeStamp1",
"l.Value1 as Value1",
"r.TimeStamp2 as TimeStamp2",
"r.Value2 as Value2")
window_spec = Window.partitionBy("TimeStamp1").orderBy("TimeStamp2")
candidate_df.withColumn("rn" ,F.row_number().over(window_spec))\
.filter(F.col("rn") == 1)\
.drop("rn", "TimeStamp2")\
.show(200, False)
输出
+-----------------------+------+------+
|TimeStamp1 |Value1|Value2|
+-----------------------+------+------+
|2021-11-01 01:55:29.473|131 |B |
|2021-11-01 01:55:30.474|3 |B |
|2021-11-01 05:01:55.247|195 |D |
|2021-11-01 05:01:56.247|67 |D |
|2021-11-01 09:41:30.264|131 |F |
|2021-11-01 09:41:32.264|67 |F |
+-----------------------+------+------+
【讨论】:
10/10,非常感谢!以上是关于在另一列上查找最近的时间戳并在新列中添加值 PySpark的主要内容,如果未能解决你的问题,请参考以下文章