如何创建与列相关的大小的 Pyspark 窗口函数
Posted
技术标签:
【中文标题】如何创建与列相关的大小的 Pyspark 窗口函数【英文标题】:How to create a Pyspark window function of Column-dependent size 【发布时间】:2020-12-16 12:56:00 【问题描述】:我在尝试使用 pyspark 中的窗口函数在与列相关的先前行数上累积值时遇到意外错误。
重现我遇到的错误的最小工作示例 (MWE) 如下:
from pyspark.sql import Window
import pandas as pd
df = sqlContext.createDataFrame( [("A", 0.1, pd.datetime(2020,12,1), 0),
("A", 2.1, pd.datetime(2020,12,5), 3),
("A", 1.1, pd.datetime(2020,12,7), 1),
("A", 3.1, pd.datetime(2020,12,9), 3),
],
["id", "value","timestamp", "previous_rows_to_consider"] )
df.show()
# +---+-----+-------------------+-------------------------+
# | id|value| timestamp|previous_rows_to_consider|
# +---+-----+-------------------+-------------------------+
# | A| 0.1|2020-12-01 00:00:00| 0|
# | A| 2.1|2020-12-05 00:00:00| 3|
# | A| 1.1|2020-12-07 00:00:00| 1|
# | A| 3.1|2020-12-09 00:00:00| 3|
# +---+-----+-------------------+-------------------------+
import pyspark.sql.functions as F
w = Window.partitionBy('id').orderBy( F.col('timestamp') ).rowsBetween( -F.col('previous_rows_to_consider'),0 )
df = df.withColumn('value_cumsum_on_previous_rows', F.sum('value').over(w) )
df.show()
产生ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
。
我还尝试了一些解决方法,使用rangeBetween
而不是rowsBetween
,但我得到了同样的错误。
正如@murtihash 在相关的question 中指出的那样,我确实怀疑rowsBetween
/rangeBetween
根本不接受列类型作为输入,但在一些在线资源上我发现至少@ 987654329@ 应该(参见例如提供的概述here)。
rangeBetween
和 rowsBetween
只接受整数值作为输入?
如果是后者,谁能建议一种解决方法来计算与列相关的范围/行数上的累积总和?
【问题讨论】:
【参考方案1】:确实,您不能将列放入范围/行窗口中。作为一种解决方法(受How to calculate rolling sum with varying window sizes in PySpark 启发):
df2 = df.withColumn(
'value_list',
F.reverse(
F.collect_list('value').over(Window.partitionBy('id').orderBy('timestamp'))
)
).withColumn(
'cumsum',
F.expr('''aggregate(
slice(value_list, 1, previous_rows_to_consider + 1),
cast(0 as double),
(x, y) -> x + y
)''')
)
df2.show()
+---+-----+-------------------+-------------------------+--------------------+------+
| id|value| timestamp|previous_rows_to_consider| value_list|cumsum|
+---+-----+-------------------+-------------------------+--------------------+------+
| A| 0.1|2020-12-01 00:00:00| 0| [0.1]| 0.1|
| A| 2.1|2020-12-05 00:00:00| 3| [2.1, 0.1]| 2.2|
| A| 1.1|2020-12-07 00:00:00| 1| [1.1, 2.1, 0.1]| 3.2|
| A| 3.1|2020-12-09 00:00:00| 3|[3.1, 1.1, 2.1, 0.1]| 6.4|
+---+-----+-------------------+-------------------------+--------------------+------+
请注意,此解决方案适用于前面有行的窗口。如果您需要以下行,请删除 F.reverse
。
【讨论】:
我试图通过将F.expr
的输入转换为“纯pyspark”来更好地理解单个步骤。我设法理解了该命令的大部分功能。但是,我无法将该步骤转换为“纯 pyspark”,因为 (1) 在 this answer slice
之后仅使用 F.expr
接受列输入; (2) 我找不到“纯pyspark”方式来总结不诉诸F.expr('aggregate(...)')
的数组列的元素。
Pyspark 不支持所有的 Spark SQL 函数,并且当它支持时,有时它不支持使用列作为参数。不幸的是,这是 pyspark 的一个限制,但幸运的是我们总是可以使用 F.expr
另外,因为我想它可能对像我这样不熟悉F.expr
的人有用,让我在这里添加我对它的用法的理解:(1)slice
只取第一个value_list 的 previous_rows_to_consider 元素,(2)aggregate
,与(x, y) -> x + y
第三个参数总结了结果列的元素,(3)我猜@987654334 @ 要么指定聚合函数产生的列的类型,要么指定如何处理 0/null 值)。如果我弄错了,请告诉我。
我认为你是对的。在没有演员表的情况下尝试一下,看看你得到了什么错误;)这会告诉你演员表的用途
嗯,我试过了,但我无法真正弄清楚。看起来cast
中的值就像一种偏移量(如果我将 0 更改为标量值 x,列 cumsum 会被移动x)。无论如何,在这一点上对我来说一切都很好。非常感谢@mck!以上是关于如何创建与列相关的大小的 Pyspark 窗口函数的主要内容,如果未能解决你的问题,请参考以下文章
如何使用滚动窗口函数计算 Pyspark Dataframe 中等于某个值的相邻值的数量?
如何在 Pyspark 2.1 中使用窗口函数来计算星期几的出现次数