Pyspark:在窗口内使用 udf

Posted

技术标签:

【中文标题】Pyspark:在窗口内使用 udf【英文标题】:Pyspark: using udf within window 【发布时间】:2018-03-29 12:28:49 【问题描述】:

我需要使用 Pyspark 检测时间序列的阈值。 在下面的示例图中,我想检测(通过存储关联的时间戳)参数 ALT_STD 大于 5000 然后小于 5000 的每次出现。

对于这个简单的案例,我可以运行简单的查询,例如

t_start = df.select('timestamp')\
                .filter(df.ALT_STD > 5000)\
                .sort('timestamp')\
                .first()
t_stop = df.select('timestamp')\
               .filter((df.ALT_STD < 5000)\                           
                       & (df.timestamp > t_start.timestamp))\
               .sort('timestamp')\
               .first()

但是,在某些情况下,事件可能是循环的,我可能有几条曲线(即 ALT_STD 会多次升高或低于 5000)。当然,如果我使用上面的查询,我将只能检测到第一次出现。

我想我应该使用带有 udf 的窗口函数,但我找不到可行的解决方案。 我的猜测是算法应该是这样的:

windowSpec = Window.partitionBy('flight_hash')\
                   .orderBy('timestamp')\
                   .rowsBetween(Window.currentRow, 1)

def detect_thresholds(x):
    if (x['ALT_STD'][current_row]< 5000) and (x['ALT_STD'][next_row] > 5000):
        return x['timestamp'] #Or maybe simply 1
    if (x['ALT_STD'][current_row]> 5000) and (x['ALT_STD'][current_row] > 5000):
    return x['timestamp'] #Or maybe simply 2
    else:
        return 0

import pyspark.sql.functions as F
detect_udf = F.udf(detect_threshold, IntegerType())
df.withColumn('Result', detect_udf(F.Struct('ALT_STD')).over(windowSpec).show()

这种算法在 Pyspark 中可行吗?怎么样?

后记: 作为旁注,我已经了解如何使用 udf 或 udf 和内置 sql 窗口函数,但不了解如何组合 udf AND window。 例如:

# This will compute the mean (built-in function)
df.withColumn("Result", F.mean(df['ALT_STD']).over(windowSpec)).show()

# This will also work
divide_udf = F.udf(lambda x: x[0]/1000., DoubleType())
df.withColumn('result', divide_udf(F.struct('timestamp')))

【问题讨论】:

【参考方案1】:

感谢 user9569772 的回答,我发现了。他的解决方案不起作用,因为 .lag() 或 .lead() 是窗口函数。

from pyspark.sql.functions import when
from pyspark.sql import functions as F

# Define conditions
det_start = (F.lag(F.col('ALT_STD')).over(windowSpec) < 100)\
          & (F.lead(F.col('ALT_STD'), 0).over(windowSpec) >= 100)
det_end = (F.lag(F.col('ALT_STD'), 0).over(windowSpec) > 100)\
        & (F.lead(F.col('ALT_STD')).over(windowSpec) < 100)

# Combine conditions with .when() and .otherwise()
result = (when(det_start, 1)\
       .when(det_end, 2)\
       .otherwise(0))

df.withColumn("phases", result).show()

【讨论】:

【参考方案2】:

这里不需要 udf(而且 python udfs 不能用作窗口函数)。只需将lead / lagwhen 一起使用:

from pyspark.sql.functions import col, lag, lead, when

result = (when((col('ALT_STD') < 5000) & (lead(col('ALT_STD'), 1) > 5000), 1)
    .when(col('ALT_STD') > 5000) & (lead(col('ALT_STD'), 1) < 5000), 1)
    .otherwise(0))

df.withColum("result", result)

【讨论】:

感谢您的回答。但是,运行此代码时出现错误:Py4JJavaError:调用 o18780.showString 时发生错误。 : java.lang.UnsupportedOperationException: 无法评估表达式:lead(input[2, bigint, true], 1, null) 另外,lag 被导入但在你的例子中没有使用,这是一个错误吗?

以上是关于Pyspark:在窗口内使用 udf的主要内容,如果未能解决你的问题,请参考以下文章

pyspark如何在窗口内聚合

如何在 Pyspark 2.1 中使用窗口函数来计算星期几的出现次数

pySpark - 在滚动窗口中获取最大值行

如何在 PySpark 中计算不同窗口大小的滚动总和

pyspark 时间序列数据的高性能滚动/窗口聚合

(Pyspark - 在一段时间内按用户分组