如何在 Spark 中创建有状态的 UDF?
Posted
技术标签:
【中文标题】如何在 Spark 中创建有状态的 UDF?【英文标题】:How to Create Stateful UDFs in Spark? 【发布时间】:2020-05-07 12:58:43 【问题描述】:我想在 spark 中创建一个 udf 以使用不适合 scala fp 样式且仅修改其内部状态的 Java 数据结构。为简化起见,这是我正在使用的骨架 java 类。
public class MagicStats
List<Long> rawData;
public void processDataPoint(long dataPoint)
rawData.add(dataPoint);
//some magic processing
public void merge(MagicStats anotherMagicStats)
//merge with another instance to combine states
public long eval()
//do some magic with data
//return result
关于我在这门课上要做什么的更多背景知识。我有一些按天存储的数据,对于每天的分区,我会生成一些汇总统计信息,包括计数、平均值等,以及这个特殊的 MagicStats(将由课堂上的eval()
获得)并将它们保存在数据库中。这个 MagicStats 的特别之处在于:
-
我需要数据的每日 MagicStats 结果。
我需要每月汇总每日 MagicStats 结果(无法从每日结果进行算术计算,只能由班级处理)。
如您所见,第二个要求意味着我必须为每个每日分区拍摄 MagicStats 对象的快照,并将其作为原始字节存储在数据库的列中,以便在每月聚合时我可以重建所有30 个来自字节数组的内存中的 MagicStats 对象并调用merge(MagicStats) and then eval()
以正确聚合。
现在是困难的部分。如何创建一个不从输入流返回结果而是修改 java 对象的内部状态的 udf?这是我卡在的地方(下面的伪代码):
//input_monthly_data
// +----------+------+
// | day | value|
// +----------+------+
// |2020-01-01| 3000|
// |2020-01-02| 4500|
// |2020-01-03| 3500|
// |..........| ....|
// +----------+------+
val df = spark.read.json("input_monthly_data.json")
df.groupby("day").agg(MyUDF(data).as("daily stats").select("daily stats", "avg", "count").saveAsTable()
class MyUDF extends UserDefinedFunction
def apply(input: Long): Column =
//create a static MagicStats instance
//update the state of the instance per data point
//serialize this instance to bytes after done for each day's partition
//return the bytes for later persistence
//output_monthly_data
// +----------+------+-----------------+
// | day | count| MagicStats bytes|
// +----------+------+-----------------+
// |2020-01-01| 10 | some binary. |
// |2020-01-02| 20 | some binary. |
// |2020-01-03| 25 | some binary. |
// |..........| ....| some binary. |
// +----------+------+-----------------|
任何关于如何使这个 UDF 工作或以其他方式实现我的目标的建议将不胜感激!
【问题讨论】:
【参考方案1】:我想也许你想实现 UserDefinedAggregateFunction
而不是 UserDefinedFunction
。
为了产生聚合结果,UserDefinedAggregateFunction
具有更新给定组中每个数据点状态的概念,这似乎就是您所追求的。
查看这些链接了解更多信息:
User-defined aggregate functions - Scala UserDefinedAggregateFunction — Contract for UDAFs【讨论】:
我认为 UserDefinedAggregateFunction 是有道理的,但我意识到目前 Spark 不再允许 UserDefinedType(参见answer)。这意味着我不能真正使用我的自定义数据类型创建一个 UserDefinedAggregateFunction 用于聚合(在我的情况下为 MagicStats)。关于如何解决这个问题的任何想法? @diyun 我没有用 UDAF 尝试过,但我认为你可以将聚合的结果设为 StructType。像StructType(Seq(StructField("rawData", ArrayType(LongType))))
这样的东西。虽然如果您实际上只需要 long 数组,可能只是 ArrayType(LongType)
,然后将其放入您的类的实例中,或者在聚合之后 执行您需要执行的任何操作。这篇文章可能会有所帮助:***.com/questions/46676785/…以上是关于如何在 Spark 中创建有状态的 UDF?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Java 中创建一个接受字符串数组的 Spark UDF?