如何使用滚动窗口函数计算 Pyspark Dataframe 中等于某个值的相邻值的数量?

Posted

技术标签:

【中文标题】如何使用滚动窗口函数计算 Pyspark Dataframe 中等于某个值的相邻值的数量?【英文标题】:How to count the number of adjacent values in a Pyspark Dataframe equal to a certain value using a rolling window function? 【发布时间】:2020-01-17 09:39:56 【问题描述】:

可以使用以下方法创建示例数据框:

    from pyspark.sql.functions import col
    from pyspark.sql.window import Window

    df = sc.parallelize([['2019-08-29 01:00:00',0],
                          ['2019-08-29 02:00:00',0],
                          ['2019-08-29 03:00:00',0],
                          ['2019-08-29 04:00:00',1],
                          ['2019-08-29 05:00:00',2],
                          ['2019-08-29 06:00:00',3],
                          ['2019-08-29 07:00:00',0],
                          ['2019-08-29 08:00:00',2],
                          ['2019-08-29 09:00:00',0],
                          ['2019-08-29 10:00:00',1]]).toDF(['DATETIME','VAL']).withColumn('DATETIME',col('DATETIME').cast('timestamp'))

我想生成一个列,其计数等于 0 值在 3 小时内出现的次数(当前时间的 +/- 1 小时,包括当前 Val)。可以使用以下方法创建窗口:

w1 = (Window()
 .orderBy(col('DATETIME').cast('long'))
 .rangeBetween(-(60*60), 60*60))

期望的结果:

+-------------------+---+---+
|           DATETIME|VAL|NUM|
+-------------------+---+---+
|2019-08-29 01:00:00|  0|  2|
|2019-08-29 02:00:00|  0|  3|
|2019-08-29 03:00:00|  0|  2|
|2019-08-29 04:00:00|  1|  1|
|2019-08-29 05:00:00|  2|  0|
|2019-08-29 06:00:00|  3|  1|
|2019-08-29 07:00:00|  0|  1|
|2019-08-29 08:00:00|  2|  2|
|2019-08-29 09:00:00|  0|  1|
|2019-08-29 10:00:00|  1|  1|
+-------------------+---+---+

【问题讨论】:

【参考方案1】:

如果您每个 DATETIME 只有 1 个条目,您可以使用 leadlag 函数来获取上一个和下一个值,然后您可以算零。

from pyspark.sql.functions import udf, array, col
from pyspark.sql.types import IntegerType

count_zeros_udf = udf(lambda arr: arr.count(0), IntegerType())

df.withColumn('lag1', f.lag(col('VAL'), 1, -1).over(Window.orderBy("DATETIME")))   # Get the previous value
.withColumn('lag2', f.lead(col('VAL'), 1, -1).over(Window.orderBy("DATETIME")))    # Get the next value
.withColumn('NUM', count_zeros_udf(array('VAL', 'lag1', 'lag2')))                  # Count zeros using the udf
.drop('lag1', 'lag2')                                                              # Drop the extra columns
.show()

+-------------------+---+---+
|           DATETIME|VAL|NUM|
+-------------------+---+---+
|2019-08-29 01:00:00|  0|  2|
|2019-08-29 02:00:00|  0|  3|
|2019-08-29 03:00:00|  0|  2|
|2019-08-29 04:00:00|  1|  1|
|2019-08-29 05:00:00|  2|  0|
|2019-08-29 06:00:00|  3|  1|
|2019-08-29 07:00:00|  0|  1|
|2019-08-29 08:00:00|  2|  2|
|2019-08-29 09:00:00|  0|  1|
|2019-08-29 10:00:00|  1|  1|
+-------------------+---+---+

使用 pyspark >= 2.4,您可以在窗口上使用 UDFpandas UDF,如下所述 User defined function to be applied to Window in PySpark? 。不幸的是,我没有 pyspark 2.4 或更高版本,因此无法对其进行测试。

【讨论】:

以上是关于如何使用滚动窗口函数计算 Pyspark Dataframe 中等于某个值的相邻值的数量?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 PySpark 中计算不同窗口大小的滚动总和

如何在 Pyspark 2.1 中使用窗口函数来计算星期几的出现次数

pySpark - 在滚动窗口中获取最大值行

pyspark 时间序列数据的高性能滚动/窗口聚合

使用窗口函数计算 PySpark 中的累积和

用二维窗口计算滚动函数的最快方法是啥?