如何在 Spark 中创建有状态的 UDF?

Posted

技术标签:

【中文标题】如何在 Spark 中创建有状态的 UDF?【英文标题】:How to Create Stateful UDFs in Spark? 【发布时间】:2020-05-07 12:58:43 【问题描述】:

我想在 spark 中创建一个 udf 以使用不适合 scala fp 样式且仅修改其内部状态的 Java 数据结构。为简化起见,这是我正在使用的骨架 java 类。

public class MagicStats 
    List<Long> rawData;

    public void processDataPoint(long dataPoint) 
        rawData.add(dataPoint);
        //some magic processing
    

    public void merge(MagicStats anotherMagicStats) 
        //merge with another instance to combine states
    

    public long eval() 
        //do some magic with data
        //return result
    

关于我在这门课上要做什么的更多背景知识。我有一些按天存储的数据,对于每天的分区,我会生成一些汇总统计信息,包括计数、平均值等,以及这个特殊的 MagicStats(将由课堂上的eval() 获得)并将它们保存在数据库中。这个 MagicStats 的特别之处在于:

    我需要数据的每日 MagicStats 结果。 我需要每月汇总每日 MagicStats 结果(无法从每日结果进行算术计算,只能由班级处理)。

如您所见,第二个要求意味着我必须为每个每日分区拍摄 MagicStats 对象的快照,并将其作为原始字节存储在数据库的列中,以便在每月聚合时我可以重建所有30 个来自字节数组的内存中的 MagicStats 对象并调用merge(MagicStats) and then eval() 以正确聚合。

现在是困难的部分。如何创建一个不从输入流返回结果而是修改 java 对象的内部状态的 udf?这是我卡在的地方(下面的伪代码):

//input_monthly_data
// +----------+------+
// |   day    | value|
// +----------+------+
// |2020-01-01|  3000|
// |2020-01-02|  4500|
// |2020-01-03|  3500|
// |..........|  ....|
// +----------+------+

val df = spark.read.json("input_monthly_data.json")
df.groupby("day").agg(MyUDF(data).as("daily stats").select("daily stats", "avg", "count").saveAsTable()

class MyUDF extends UserDefinedFunction 
    def apply(input: Long): Column = 
        //create a static MagicStats instance
        //update the state of the instance per data point
        //serialize this instance to bytes after done for each day's partition
        //return the bytes for later persistence
    

//output_monthly_data
// +----------+------+-----------------+
// |   day    | count| MagicStats bytes|
// +----------+------+-----------------+
// |2020-01-01|  10  | some binary.    |
// |2020-01-02|  20  | some binary.    |
// |2020-01-03|  25  | some binary.    |
// |..........|  ....| some binary.    |
// +----------+------+-----------------|

任何关于如何使这个 UDF 工作或以其他方式实现我的目标的建议将不胜感激!

【问题讨论】:

【参考方案1】:

我想也许你想实现 UserDefinedAggregateFunction 而不是 UserDefinedFunction

为了产生聚合结果,UserDefinedAggregateFunction 具有更新给定组中每个数据点状态的概念,这似乎就是您所追求的。

查看这些链接了解更多信息:

User-defined aggregate functions - Scala UserDefinedAggregateFunction  —  Contract for UDAFs

【讨论】:

我认为 UserDefinedAggregateFunction 是有道理的,但我意识到目前 Spark 不再允许 UserDefinedType(参见answer)。这意味着我不能真正使用我的自定义数据类型创建一个 UserDefinedAggregateFunction 用于聚合(在我的情况下为 MagicStats)。关于如何解决这个问题的任何想法? @diyun 我没有用 UDAF 尝试过,但我认为你可以将聚合的结果设为 StructType。像StructType(Seq(StructField("rawData", ArrayType(LongType)))) 这样的东西。虽然如果您实际上只需要 long 数组,可能只是 ArrayType(LongType),然后将其放入您的类的实例中,或者在聚合之后 执行您需要执行的任何操作。这篇文章可能会有所帮助:***.com/questions/46676785/…

以上是关于如何在 Spark 中创建有状态的 UDF?的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 中的重型有状态 UDF

如何在构建有状态小部件之前等待功能? [复制]

如何在 Java 中创建一个接受字符串数组的 Spark UDF?

关于在 Spark Scala 中创建用户定义函数 (UDF)

火花在UDF中创建数据框

在 Spark 中创建 UDF 时出错