用户定义的聚合函数 Spark Java - 合并问题

Posted

技术标签:

【中文标题】用户定义的聚合函数 Spark Java - 合并问题【英文标题】:User Defined Aggregate Functions Spark Java - merge problem 【发布时间】:2021-08-20 15:42:26 【问题描述】:

我正在尝试使用文档here 之后的用户定义的聚合函数,我想首先将 2 个值 x 和 y 传递给 SimpleRegression,然后通过追加合并 simpleRegression。我的问题是 reduce 函数正确接收值(如果我要求它打印 x 和 y 它会正确打印它们),但是如果我看到 @987654324 中的回归器添加了多少值@ 函数(使用提供的代码中显示的getN() 方法)它返回我没有向这些回归器添加任何值,也就是说,好像没有对它们执行addData(),为什么会发生这种情况?我做错了什么?

显然,这不允许我做我想做的事情:获取每条回归线的斜率和截距,因为当执行 finish 函数时,回归量为空,因此斜率和截距设置为 NaN。

这是我的代码(Java):

public static class RegressorAggregator extends Aggregator<Tuple2<Long, Long>, SimpleRegressionWrapper, LineParameters> 


    //Valore zero per l'aggregazione - dovrebbe soddisfare a+zero=a;
    public SimpleRegressionWrapper zero()
        return new SimpleRegressionWrapper();
    

    public SimpleRegressionWrapper reduce(SimpleRegressionWrapper simpleRegression, Tuple2<Long, Long> xy)
        double x = (double)xy._1;
        double y = (double)xy._2;
        simpleRegression.addData(x,y);
        return simpleRegression;
    

    public SimpleRegressionWrapper merge(SimpleRegressionWrapper a, SimpleRegressionWrapper b)
        Logger log = LogManager.getLogger(getClass().getSimpleName());
        log.error(a.getN() + " " + b.getN());
        a.append(b);
        return a;
    

    public LineParameters finish(SimpleRegressionWrapper simpleRegression)
        return new LineParameters(simpleRegression.getSlope(), simpleRegression.getIntercept());
    

    public Encoder<SimpleRegressionWrapper> bufferEncoder()
        return Encoders.bean(SimpleRegressionWrapper.class);
    

    public Encoder<LineParameters> outputEncoder()
        return Encoders.bean(LineParameters.class);
    


【问题讨论】:

【参考方案1】:

改变这个问题可以解决:

public Encoder<SimpleRegressionWrapper> bufferEncoder()
        return Encoders.bean(SimpleRegressionWrapper.class);
    

进入这个:

public Encoder<SimpleRegressionWrapper> bufferEncoder()
        return Encoders.javaSerialization(SimpleRegressionWrapper.class);
    

【讨论】:

以上是关于用户定义的聚合函数 Spark Java - 合并问题的主要内容,如果未能解决你的问题,请参考以下文章

极简spark教程spark聚合函数

如何在 Spark SQL 中定义和使用用户定义的聚合函数?

如何将数组传递给 Spark (UDAF) 中的用户定义聚合函数

在 pyspark 中应用用户定义的聚合函数的替代方法

在Apache Spark中使用UDF

JAVA spark数据集中的GroupBy和聚合函数