如何创建自定义 groupBy 聚合器?

Posted

技术标签:

【中文标题】如何创建自定义 groupBy 聚合器?【英文标题】:How to create custom groupBy aggregator? 【发布时间】:2017-06-19 21:53:39 【问题描述】:

我正在尝试创建自己的方差函数来计算 DataFrame groupBy 聚合步骤期间的方差。我不能使用 functions.variance() 方法,因为输入的大小存储在一列中。我想找到“PercentEaten”列的方差,其中项目总数存储在名为“NumberOfItems”的列中。

我对如何创建自己的返回 Column 类型的函数感到困惑,因为 .agg() 方法需要具有 Column 返回类型的函数。

这是我正在寻找的示例

myDF.groupBy(col("Store"), col("week")).agg(sum(col("PercentEaten")).divide(col("NumberOfItems")).as("MeanPercentEaten"), myVariance(col("PercentEaten"), col("NumberOfItems")).as("VariancePercentEaten");

我只是不确定如何定义 myVariance() 方法。这也是我第一次使用 Spark,所以我的编码风格可能不是最好的。

【问题讨论】:

【参考方案1】:

我只是不确定如何定义 myVariance() 方法。

这是用户定义的聚合函数(又名 UDAF)的示例。

要创建一个,您必须实现org.apache.spark.sql.expressions.UserDefinedAggregateFunction:

用于实现用户定义聚合函数 (UDAF) 的基类。

之后,您应该创建自定义 UDAF 的实例并使用 applydistinct 方法来使用它。

apply(Column... exprs) 使用给定的列作为输入参数为此 UDAF 创建一个列。

distinct(Column... exprs) 使用给定列的不同值作为输入参数为此 UDAF 创建一个列。


(我不会展示任何代码,因为 Java 不是我的 Spark 语言)。

【讨论】:

UDAF 是否适用于 pyspark?好像只有df.groupBy().agg() 我对 pyspark 一无所知。为什么df.groupBy().agg() 会使 UDF 在 pyspark 上不起作用? 我可以在另一个聚合器中使用来自一个聚合器的列吗?例如 df.groupBy().agg(mean(col().as("Mean"), variance(col(),col("Mean))); 所以我们在方差聚合器中聚合期间创建的平均值列? @mjsee 是的。我认为这引出了另一个问题,不是吗?然后你可以看到解决方案;-)

以上是关于如何创建自定义 groupBy 聚合器?的主要内容,如果未能解决你的问题,请参考以下文章

应用自定义 groupby 聚合函数在 pandas python 中输出二进制结果

pandas使用groupby函数进行分组聚合使用agg函数指定聚合统计计算的数值变量并自定义统计计算结果的名称(naming columns after aggregation)

pandas编写自定义函数计算多个数据列的加和(sum)使用groupby函数和apply函数聚合计算分组内多个数据列的加和

python中的自定义pivot_ui聚合器?

Liferay 7 - 博客聚合器的自定义样式

如何使用 shadow dom 创建聚合物自定义元素,以便可以访问它的 shadowRoot?