使用 pyspark 跟踪和查找数据框中的最新值

Posted

技术标签:

【中文标题】使用 pyspark 跟踪和查找数据框中的最新值【英文标题】:tracking and finding latest value in dataframe using pyspark 【发布时间】:2021-05-28 12:36:15 【问题描述】:

我是 Pyspark 的新手,我遇到了需要解决它的情况。有人可以帮助解决它。我检查并尝试在谷歌和堆栈溢出中找到类似的问题。但不幸的是,我没有得到它。

问题:

我的数据框包含两列,一列已过时,另一列替换。

数据帧:

在上面的数据框中,绝对值在替换列中得到更新。例如:这里 10 变成 12,在下一行 12 变成 14,第 3 行再次第 14 个值变成 16。如果您看到下一行中的值得到更新

所以前三个值成为一组,因为链值正在更新。以红色突出显示,因此对于那些过时的值,替换中的最后一个值是 16 的最新值。对于其他两行,19 是连接值,并以黄色突出显示,因此这两行的最新值为 20。

预期输出

我尝试在 pyspark 中使用 map 和 foreach,但没有得到期望的结果。一些请帮助我如何解决这个问题。

【问题讨论】:

【参考方案1】:

首先,您必须发现所有链并为它们创建一个组。分组后,您可以应用f.last()函数返回所需的值。

from pyspark.sql import Row
from pyspark.sql.window import Window
import pyspark.sql.functions as f


df = spark.createDataFrame([
  (10, 12),
  (12, 14),
  (14, 16),
  (18, 19),
  (19, 20),
  (22, 24),
  (24, 25),
  (25, 27),
  (29, 30)
], ('obsolute', 'replace'))

w = Window.orderBy('obsolute')
df = (df
      .withColumn('chain', f.coalesce(f.lag('replace').over(w) == f.col('obsolute'), f.lit(True)))
      .withColumn('group', f.sum((f.col('chain') == f.lit(False)).cast('Int')).over(w)))
# +--------+-------+-----+-----+
# |obsolute|replace|chain|group|
# +--------+-------+-----+-----+
# |10      |12     |true |0    |
# |12      |14     |true |0    |
# |14      |16     |true |0    |
# |18      |19     |false|1    |
# |19      |20     |true |1    |
# |22      |24     |false|2    |
# |24      |25     |true |2    |
# |25      |27     |true |2    |
# |29      |30     |false|3    |
# +--------+-------+-----+-----+


w = Window.partitionBy('group')
df = df.select('obsolute', 'replace', f.last('replace').over(w).alias('latest'))

df.show(truncate=False)

输出

+--------+-------+------+
|obsolute|replace|latest|
+--------+-------+------+
|10      |12     |16    |
|12      |14     |16    |
|14      |16     |16    |
|18      |19     |20    |
|19      |20     |20    |
|22      |24     |27    |
|24      |25     |27    |
|25      |27     |27    |
|29      |30     |30    |
+--------+-------+------+

【讨论】:

以上是关于使用 pyspark 跟踪和查找数据框中的最新值的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 将最小值添加回数据框

如何使用 groupby 和聚合将 pyspark 数据框中的行与多列连接起来

提取特定单元格的值并将其填充以代替 pyspark 数据框中的 NA 值

如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列

如何将数据框中的连接值插入到 Pyspark 中的另一个数据框中?

使用较低的函数将pyspark数据框中单列中的值转换为文本清理中的小写[重复]