Pyspark - 如何检查两条记录中哪一条具有最新日期及其列值?

Posted

技术标签:

【中文标题】Pyspark - 如何检查两条记录中哪一条具有最新日期及其列值?【英文标题】:Pyspark - how to check which of the two records has the latest date and its column value? 【发布时间】:2020-06-24 12:08:53 【问题描述】:

我有一个 DataFrame 并想检查具有最新日期的记录是否具有特定值。例如,对于下面的 DF,我需要检查 ID 为 'B30' 的记录并获取 metric_1 == 0.25 的记录。

original_metrics_df = self.spark.createDataFrame(
    [('A10', -0.35, '2020-01-04'),
     ('A20', -0.20, '2017-05-01'),
     ('B30', 0.59, '2018-02-08'),
     ('B30', 0.25, '2019-05-01')
     ],
    ['id', 'metric_1', 'transaction_date']
)

首先我进行过滤以仅保留 ID 为 B30 的记录:

filtered_metrics_df = original_metrics_df.select('id','metric_1').filter(F.col('metric_1') == 'B30')

我应该如何进一步应用过滤以仅获取此记录 ('B30', 0.25, '2019-05-01')metric_1 值?

【问题讨论】:

@anky transaction_date 列中没有重复项 【参考方案1】:

你可以在窗户上试一试:

如果 transaction_date 不是日期列转换为日期:

original_metrics_df = original_metrics_df.withColumn("transaction_date",
                                F.to_date("transaction_date"))

然后我们可以在一个窗口中获取最大日期并选择所需的条件:

w = Window.partitionBy("id")
cond = (F.col("id")=="B30") & (F.col("transaction_date")==F.col("Latest_date"))

(original_metrics_df.withColumn("Latest_date",F.max("transaction_date").over(w))
 .filter(cond).drop("Latest_date")).show()

+---+--------+----------------+
| id|metric_1|transaction_date|
+---+--------+----------------+
|B30|    0.25|      2019-05-01|
+---+--------+----------------+

或者,先过滤已知条件,再做同样的操作:

w = Window.partitionBy("id")

(original_metrics_df.filter(F.col("id")=="B30")
  .withColumn("Latest_date",F.max("transaction_date").over(w))
  .filter(F.col("transaction_date")==F.col("Latest_date"))).show()

【讨论】:

以上是关于Pyspark - 如何检查两条记录中哪一条具有最新日期及其列值?的主要内容,如果未能解决你的问题,请参考以下文章

在Arcgis中断开的道路线如何连接起来?

找出后缀数组的两种算法中哪一种更快,为啥?

编码内部联接的两种方法中哪一种更快?

从 S3 加载 spark DF,多个文件。这些方法中哪一种最好?

在 PySpark RDD 中,如何使用 foreachPartition() 打印出每个分区的第一条记录?

事务和锁--查看数据库中的锁