如何创建自定义 groupBy 聚合器?
Posted
技术标签:
【中文标题】如何创建自定义 groupBy 聚合器?【英文标题】:How to create custom groupBy aggregator? 【发布时间】:2017-06-19 21:53:39 【问题描述】:我正在尝试创建自己的方差函数来计算 DataFrame groupBy 聚合步骤期间的方差。我不能使用 functions.variance() 方法,因为输入的大小存储在一列中。我想找到“PercentEaten”列的方差,其中项目总数存储在名为“NumberOfItems”的列中。
我对如何创建自己的返回 Column 类型的函数感到困惑,因为 .agg() 方法需要具有 Column 返回类型的函数。
这是我正在寻找的示例
myDF.groupBy(col("Store"), col("week")).agg(sum(col("PercentEaten")).divide(col("NumberOfItems")).as("MeanPercentEaten"), myVariance(col("PercentEaten"), col("NumberOfItems")).as("VariancePercentEaten");
我只是不确定如何定义 myVariance() 方法。这也是我第一次使用 Spark,所以我的编码风格可能不是最好的。
【问题讨论】:
【参考方案1】:我只是不确定如何定义 myVariance() 方法。
这是用户定义的聚合函数(又名 UDAF)的示例。
要创建一个,您必须实现org.apache.spark.sql.expressions.UserDefinedAggregateFunction:
用于实现用户定义聚合函数 (UDAF) 的基类。
之后,您应该创建自定义 UDAF 的实例并使用 apply
或 distinct
方法来使用它。
apply(Column... exprs) 使用给定的列作为输入参数为此 UDAF 创建一个列。
distinct(Column... exprs) 使用给定列的不同值作为输入参数为此 UDAF 创建一个列。
(我不会展示任何代码,因为 Java 不是我的 Spark 语言)。
【讨论】:
UDAF 是否适用于 pyspark?好像只有df.groupBy().agg() 我对 pyspark 一无所知。为什么df.groupBy().agg()
会使 UDF 在 pyspark 上不起作用?
我可以在另一个聚合器中使用来自一个聚合器的列吗?例如 df.groupBy().agg(mean(col().as("Mean"), variance(col(),col("Mean))); 所以我们在方差聚合器中聚合期间创建的平均值列?
@mjsee 是的。我认为这引出了另一个问题,不是吗?然后你可以看到解决方案;-)以上是关于如何创建自定义 groupBy 聚合器?的主要内容,如果未能解决你的问题,请参考以下文章
应用自定义 groupby 聚合函数在 pandas python 中输出二进制结果
pandas使用groupby函数进行分组聚合使用agg函数指定聚合统计计算的数值变量并自定义统计计算结果的名称(naming columns after aggregation)
pandas编写自定义函数计算多个数据列的加和(sum)使用groupby函数和apply函数聚合计算分组内多个数据列的加和