如何在 PySpark 中将 sql 函数与 UDAF 组合/链接

Posted

技术标签:

【中文标题】如何在 PySpark 中将 sql 函数与 UDAF 组合/链接【英文标题】:How to combine/chain sql functions with UDAFs in PySpark 【发布时间】:2019-11-15 03:47:09 【问题描述】:

我正在尝试在 PySpark 中的 Spark 数据帧上使用一堆预定义的 sql 函数以及我自己的 UDAF

    @F.udf
    def mode(v):
     from collections import Counter
     x = [w[0] for w in Counter(v).most_common(5)]
     return x

   funs = [mean, max, min, stddev, approxCountDistinct, mode]
   columns = df.columns
   expr = [f(col(c)) for f in funs for c in columns]

   s = df.agg(*expr).collect()

当我尝试将我的 udf 与其他功能一起使用时,我得到: org.apache.spark.sql.AnalysisException:分组表达式序列为空。如果您不在乎得到哪个值,则将 '(avg(CAST(DBN AS DOUBLE)) AS avg(DBN) 包装在窗口函数中或将 'DBN' 包装在 first() (或 first_value)中。 ;;

但是当我跑步时:

funs = [mode]
   columns = df.columns
   expr = [f(collect_list(col(c))) for f in funs for c in columns]

   s = df.agg(*expr).collect()

它给出了正确的结果,但仅适用于我的 UDF 而不是其他功能。

有没有办法可以将 collect_list 函数结合到我的 udf 中,以便我可以将我的 udf 与其他函数一起运行。

【问题讨论】:

你的UDF是哪个? mode?我看到 funs = [mode] 只有你的 UDF,所以它只会返回 UDF 的值 【参考方案1】:

您收到错误是因为您在应该使用 UDAF 的聚合函数中使用 udf。 1.您可以通过How to define and use a User-Defined Aggregate Function in Spark SQL?定义自己的UDAF,或者 2.您可以手动进行聚合,然后传递给您的udf。由于您想在调用 udf 之前使用 collect_list,您可以执行以下操作:

@F.udf
    def mode(v):
     from collections import Counter
     x = [w[0] for w in Counter(v).most_common(5)]
     return x

funs = [mean, max, min, stddev, approxCountDistinct, mode]
my_funs = [mode]
expr = [f(collect_list(col(c))) if f in my_funs  else f(col(c)) for f in funs for c in columns]
s = df.agg(*expr).collect()

上面的代码中,collect_list用于在对列调用udf之前进行聚合。

【讨论】:

以上是关于如何在 PySpark 中将 sql 函数与 UDAF 组合/链接的主要内容,如果未能解决你的问题,请参考以下文章

如何在pyspark中将GUID转换为整数

如何在 Pyspark 中将字符串更改为时间戳?

在 pyspark SQL 中将字符串日期转换为日期格式

如何在 PySpark 1.6 中将 DataFrame 列从字符串转换为浮点/双精度?

如何在 SQL 中将 CASE 表达式与 SUM 函数一起使用?

如何在pyspark中将字符串列转换为ArrayType