如何创建与列相关的大小的 Pyspark 窗口函数

Posted

技术标签:

【中文标题】如何创建与列相关的大小的 Pyspark 窗口函数【英文标题】:How to create a Pyspark window function of Column-dependent size 【发布时间】:2020-12-16 12:56:00 【问题描述】:

我在尝试使用 pyspark 中的窗口函数在与列相关的先前行数上累积值时遇到意外错误。

重现我遇到的错误的最小工作示例 (MWE) 如下:

from pyspark.sql import Window
import pandas as pd

df = sqlContext.createDataFrame( [("A", 0.1, pd.datetime(2020,12,1), 0),
                                  ("A", 2.1, pd.datetime(2020,12,5), 3), 
                                  ("A", 1.1, pd.datetime(2020,12,7), 1), 
                                  ("A", 3.1, pd.datetime(2020,12,9), 3), 
                                 ],
                                 ["id", "value","timestamp", "previous_rows_to_consider"] )
df.show()
# +---+-----+-------------------+-------------------------+
# | id|value|          timestamp|previous_rows_to_consider|
# +---+-----+-------------------+-------------------------+
# |  A|  0.1|2020-12-01 00:00:00|                        0|
# |  A|  2.1|2020-12-05 00:00:00|                        3|
# |  A|  1.1|2020-12-07 00:00:00|                        1|
# |  A|  3.1|2020-12-09 00:00:00|                        3|
# +---+-----+-------------------+-------------------------+

import pyspark.sql.functions as F
w = Window.partitionBy('id').orderBy( F.col('timestamp') ).rowsBetween( -F.col('previous_rows_to_consider'),0 )

df = df.withColumn('value_cumsum_on_previous_rows', F.sum('value').over(w) )
df.show()

产生ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

我还尝试了一些解决方法,使用rangeBetween 而不是rowsBetween,但我得到了同样的错误。

正如@murtihash 在相关的question 中指出的那样,我确实怀疑rowsBetween/rangeBetween 根本不接受列类型作为输入,但在一些在线资源上我发现至少@ 987654329@ 应该(参见例如提供的概述here)。

是否有人可以了解是什么阻止了我的 MWE,或者确认 rangeBetweenrowsBetween 只接受整数值作为输入? 如果是后者,谁能建议一种解决方法来计算与列相关的范围/行数上的累积总和?

【问题讨论】:

【参考方案1】:

确实,您不能将列放入范围/行窗口中。作为一种解决方法(受How to calculate rolling sum with varying window sizes in PySpark 启发):

df2 = df.withColumn(
    'value_list', 
    F.reverse(
        F.collect_list('value').over(Window.partitionBy('id').orderBy('timestamp'))
    )
).withColumn(
    'cumsum',
    F.expr('''aggregate(
        slice(value_list, 1, previous_rows_to_consider + 1),
        cast(0 as double),
        (x, y) -> x + y
    )''')
)

df2.show()
+---+-----+-------------------+-------------------------+--------------------+------+
| id|value|          timestamp|previous_rows_to_consider|          value_list|cumsum|
+---+-----+-------------------+-------------------------+--------------------+------+
|  A|  0.1|2020-12-01 00:00:00|                        0|               [0.1]|   0.1|
|  A|  2.1|2020-12-05 00:00:00|                        3|          [2.1, 0.1]|   2.2|
|  A|  1.1|2020-12-07 00:00:00|                        1|     [1.1, 2.1, 0.1]|   3.2|
|  A|  3.1|2020-12-09 00:00:00|                        3|[3.1, 1.1, 2.1, 0.1]|   6.4|
+---+-----+-------------------+-------------------------+--------------------+------+

请注意,此解决方案适用于前面有行的窗口。如果您需要以下行,请删除 F.reverse

【讨论】:

我试图通过将F.expr 的输入转换为“纯pyspark”来更好地理解单个步骤。我设法理解了该命令的大部分功能。但是,我无法将该步骤转换为“纯 pyspark”,因为 (1) 在 this answer slice 之后仅使用 F.expr 接受列输入; (2) 我找不到“纯pyspark”方式来总结不诉诸F.expr('aggregate(...)') 的数组列的元素。 Pyspark 不支持所有的 Spark SQL 函数,并且当它支持时,有时它不支持使用列作为参数。不幸的是,这是 pyspark 的一个限制,但幸运的是我们总是可以使用 F.expr 另外,因为我想它可能对像我这样不熟悉F.expr 的人有用,让我在这里添加我对它的用法的理解:(1)slice 只取第一个value_listprevious_rows_to_consider 元素,(2)aggregate,与(x, y) -> x + y 第三个参数总结了结果列的元素,(3)我猜@987654334 @ 要么指定聚合函数产生的列的类型,要么指定如何处理 0/null 值)。如果我弄错了,请告诉我。 我认为你是对的。在没有演员表的情况下尝试一下,看看你得到了什么错误;)这会告诉你演员表的用途 嗯,我试过了,但我无法真正弄清楚。看起来cast 中的值就像一种偏移量(如果我将 0 更改为标量值 x,列 cumsum 会被移动x)。无论如何,在这一点上对我来说一切都很好。非常感谢@mck!

以上是关于如何创建与列相关的大小的 Pyspark 窗口函数的主要内容,如果未能解决你的问题,请参考以下文章

如何在 PySpark 中计算不同窗口大小的滚动总和

如何使用滚动窗口函数计算 Pyspark Dataframe 中等于某个值的相邻值的数量?

如何在 PySpark 中使用窗口函数?

如何在 Pyspark 2.1 中使用窗口函数来计算星期几的出现次数

如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?

PySpark UDF 在与列分开的情况下返回状态代码和响应