在 pyspark 中编写自定义 UDAF

Posted

技术标签:

【中文标题】在 pyspark 中编写自定义 UDAF【英文标题】:Writing custom UDAF in pyspark 【发布时间】:2019-04-04 01:46:57 【问题描述】:

我需要编写一个 pySpark 自定义 UDAF,我遇到了这个示例 Applying UDFs on GroupedData in PySpark (with functioning python example) 。在类似的行上,如线程的最后部分所示,我想出了以下函数

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_value1", DoubleType()),
    StructField("avg_value2", DoubleType()),
    StructField("sum_avg", DoubleType()),
    StructField("sub_avg", DoubleType()),
    StructField("bf_signature", Binary())
])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    gr = df['key'].iloc[0]
    x = df.value1.mean()
    y = df.value2.mean()
    w = df.value1.mean() + df.value2.mean()
    z = df.value1.mean() - df.value2.mean()
    bloomfilter = BloomFilter(8, 1)
    bloomfilter.set(df.value1)
    p=bloomfilter
    return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]+[p]])

df3.groupby("key").apply(g).show()

如代码所示,我想创建一个自定义 BloomFilter,它将为整个列构建bloomfilter,类似于 mean() 函数处理聚合整个列并为每个组生成一个聚合结果。

如何在 python 中编写这个自定义 UDAF?

【问题讨论】:

我没有找到任何解决方案,所以切换到Java UserdefinedAggregateFunction pyspark 现在支持带有 pandas 的 UDAF,请查看***.com/questions/40006395/… 【参考方案1】:

也许,这个blog 很有用,它在 python 中并不是真正的 UDAF,但它是编写类似功能的 hack。

破解方法是:

    将 groupBy 应用于 DF 在 agg() 函数中应用 collect_list() 将普通的 python UDF 函数应用于 collect_list() 的结果列表

【讨论】:

以上是关于在 pyspark 中编写自定义 UDAF的主要内容,如果未能解决你的问题,请参考以下文章

Spark sql实现自定义函数

Spark sql实现自定义函数

Spark sql实现自定义函数

Spark sql实现自定义函数

Hive中自定义UDAF函数生产小案例

(五)Hive的UDF、UDAF和UDTF自定义函数