使用 pyspark 跟踪和查找数据框中的最新值
Posted
技术标签:
【中文标题】使用 pyspark 跟踪和查找数据框中的最新值【英文标题】:tracking and finding latest value in dataframe using pyspark 【发布时间】:2021-05-28 12:36:15 【问题描述】:我是 Pyspark 的新手,我遇到了需要解决它的情况。有人可以帮助解决它。我检查并尝试在谷歌和堆栈溢出中找到类似的问题。但不幸的是,我没有得到它。
问题:
我的数据框包含两列,一列已过时,另一列替换。
数据帧:
在上面的数据框中,绝对值在替换列中得到更新。例如:这里 10 变成 12,在下一行 12 变成 14,第 3 行再次第 14 个值变成 16。如果您看到下一行中的值得到更新
所以前三个值成为一组,因为链值正在更新。以红色突出显示,因此对于那些过时的值,替换中的最后一个值是 16 的最新值。对于其他两行,19 是连接值,并以黄色突出显示,因此这两行的最新值为 20。
预期输出
我尝试在 pyspark 中使用 map 和 foreach,但没有得到期望的结果。一些请帮助我如何解决这个问题。
【问题讨论】:
【参考方案1】:首先,您必须发现所有链并为它们创建一个组。分组后,您可以应用f.last()
函数返回所需的值。
from pyspark.sql import Row
from pyspark.sql.window import Window
import pyspark.sql.functions as f
df = spark.createDataFrame([
(10, 12),
(12, 14),
(14, 16),
(18, 19),
(19, 20),
(22, 24),
(24, 25),
(25, 27),
(29, 30)
], ('obsolute', 'replace'))
w = Window.orderBy('obsolute')
df = (df
.withColumn('chain', f.coalesce(f.lag('replace').over(w) == f.col('obsolute'), f.lit(True)))
.withColumn('group', f.sum((f.col('chain') == f.lit(False)).cast('Int')).over(w)))
# +--------+-------+-----+-----+
# |obsolute|replace|chain|group|
# +--------+-------+-----+-----+
# |10 |12 |true |0 |
# |12 |14 |true |0 |
# |14 |16 |true |0 |
# |18 |19 |false|1 |
# |19 |20 |true |1 |
# |22 |24 |false|2 |
# |24 |25 |true |2 |
# |25 |27 |true |2 |
# |29 |30 |false|3 |
# +--------+-------+-----+-----+
w = Window.partitionBy('group')
df = df.select('obsolute', 'replace', f.last('replace').over(w).alias('latest'))
df.show(truncate=False)
输出
+--------+-------+------+
|obsolute|replace|latest|
+--------+-------+------+
|10 |12 |16 |
|12 |14 |16 |
|14 |16 |16 |
|18 |19 |20 |
|19 |20 |20 |
|22 |24 |27 |
|24 |25 |27 |
|25 |27 |27 |
|29 |30 |30 |
+--------+-------+------+
【讨论】:
以上是关于使用 pyspark 跟踪和查找数据框中的最新值的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 groupby 和聚合将 pyspark 数据框中的行与多列连接起来
提取特定单元格的值并将其填充以代替 pyspark 数据框中的 NA 值
如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列