如何在 PySpark 中将 sql 函数与 UDAF 组合/链接
Posted
技术标签:
【中文标题】如何在 PySpark 中将 sql 函数与 UDAF 组合/链接【英文标题】:How to combine/chain sql functions with UDAFs in PySpark 【发布时间】:2019-11-15 03:47:09 【问题描述】:我正在尝试在 PySpark 中的 Spark 数据帧上使用一堆预定义的 sql 函数以及我自己的 UDAF
@F.udf
def mode(v):
from collections import Counter
x = [w[0] for w in Counter(v).most_common(5)]
return x
funs = [mean, max, min, stddev, approxCountDistinct, mode]
columns = df.columns
expr = [f(col(c)) for f in funs for c in columns]
s = df.agg(*expr).collect()
当我尝试将我的 udf 与其他功能一起使用时,我得到:
org.apache.spark.sql.AnalysisException:分组表达式序列为空。如果您不在乎得到哪个值,则将 '(avg(CAST(DBN
AS DOUBLE)) AS avg(DBN)
包装在窗口函数中或将 'DBN
' 包装在 first() (或 first_value)中。 ;;
但是当我跑步时:
funs = [mode]
columns = df.columns
expr = [f(collect_list(col(c))) for f in funs for c in columns]
s = df.agg(*expr).collect()
它给出了正确的结果,但仅适用于我的 UDF 而不是其他功能。
有没有办法可以将 collect_list 函数结合到我的 udf 中,以便我可以将我的 udf 与其他函数一起运行。
【问题讨论】:
你的UDF是哪个?mode
?我看到 funs = [mode]
只有你的 UDF,所以它只会返回 UDF 的值
【参考方案1】:
您收到错误是因为您在应该使用 UDAF 的聚合函数中使用 udf。 1.您可以通过How to define and use a User-Defined Aggregate Function in Spark SQL?定义自己的UDAF,或者 2.您可以手动进行聚合,然后传递给您的udf。由于您想在调用 udf 之前使用 collect_list,您可以执行以下操作:
@F.udf
def mode(v):
from collections import Counter
x = [w[0] for w in Counter(v).most_common(5)]
return x
funs = [mean, max, min, stddev, approxCountDistinct, mode]
my_funs = [mode]
expr = [f(collect_list(col(c))) if f in my_funs else f(col(c)) for f in funs for c in columns]
s = df.agg(*expr).collect()
上面的代码中,collect_list用于在对列调用udf之前进行聚合。
【讨论】:
以上是关于如何在 PySpark 中将 sql 函数与 UDAF 组合/链接的主要内容,如果未能解决你的问题,请参考以下文章
如何在 PySpark 1.6 中将 DataFrame 列从字符串转换为浮点/双精度?