如何从 Pyspark / Python 数据集中先前计算的列中获取值

Posted

技术标签:

【中文标题】如何从 Pyspark / Python 数据集中先前计算的列中获取值【英文标题】:How to get value from previous calculated column in Pyspark / Python data set 【发布时间】:2020-07-29 14:55:57 【问题描述】:

我正在尝试在 Pyspark / Python 表中创建一个新列 (B)。 新列(B)是:列(A)的当前值+列(B)的先前值

desired output example image

`Id   a     b
1    977   977
2    3665  4642
3    1746  6388
4    2843  9231
5    200   9431`

当前列 B = 当前列 A + 先前列 B ; 例如第 4 行:9231(B 列)= 2843(A 列)+ 6388(以前的 B 列值)

(对于第一行,因为 B 没有先前的值,所以它是 0)

请帮助我使用 Python / PySpark 查询代码

【问题讨论】:

具体是什么问题?请参阅How to Ask、help center。 【参考方案1】:

如果没有上下文我可能是错的,但似乎你试图做 A 列的累积总和:

from pyspark.sql.window import Window
import pyspark.sql.functions as sf

df = df.withColumn('B', sf.sum(df.A).over(Window.partitionBy().orderBy().rowsBetween(
Window.unboundedPreceding, 0)))

编辑:

如果您需要根据 B 的最后一个值迭代地添加新行并假设数据框中 B 的值在此期间没有变化,我认为您最好将 B 记住在标准 python 变量中并构建下一行。

previous_B = 0
# your code to get new A
previous_B += new_A
new_row = spark.createDataFrame([(new_A, previous_B)])
df = df.union(new_row)

【讨论】:

其实A列总是有一个新值,B列是A列值+B列旧值的总和 好的,因此您逐行构建表格,并希望在获得 A 列的所有行之前更新 B 列。 是的,我正在尝试逐行构建表格,其中我已经拥有 A 的值,但我通过调用 B 的最后一个值来创建 B 的值 所以基本上每次您需要选择数据框的最后一行,然后选择该行中的 B 列。你可以用火花查询来做到这一点,但据我所知,你想做的事情可能会很复杂。 Spark 数据帧的工作方式类似于数据库,并非用于访问某些特定的行或元素,如普通数组。所以我认为最好在其他地方简单地记住最后一个 B 值。我已经用另一个解决方案编辑了我的答案。

以上是关于如何从 Pyspark / Python 数据集中先前计算的列中获取值的主要内容,如果未能解决你的问题,请参考以下文章

加入 PySpark 数据集中每个月的上个月数据

如何从 python 复制 pyspark / hadoop 中的文件

在 zeppelin 中使用从 %pyspark 到 %python 的 Dataframe

使用键名过滤pyspark中的字典

如何使用 pyspark 从 python 列表中选择随机文本值?

Python - 如何从三个数据集中查找不匹配的记录