Spark UDAF 动态输入模式处理
Posted
技术标签:
【中文标题】Spark UDAF 动态输入模式处理【英文标题】:Spark UDAF dynamic input schema handling 【发布时间】:2019-02-06 07:31:24 【问题描述】:我知道如何将具有内部结构的结构传递给 UDAF - Pass a struct to an UDAF in spark
但是我如何处理内部结构模式未知或动态的情况,因为它会根据数据发生变化。由于输入数据不符合特定模式,因此某些字段可能存在也可能不存在。假设一个数据集有
root
|-- id:string (nullable = false)
|-- age: long (nullable = true)
|-- cars: struct (nullable = true)
| |-- car1: string (nullable = true)
| |-- car2: string (nullable = true)
| |-- car3: string (nullable = true)
|-- name: string (nullable = true)
而其他数据集没有car3
root
|-- id:string (nullable = false)
|-- age: long (nullable = true)
|-- cars: struct (nullable = true)
| |-- car1: string (nullable = true)
| |-- car2: string (nullable = true)
|-- name: string (nullable = true)
如何编写一个 UDAF,它接受基于输入数据而更改的架构。
【问题讨论】:
有趣的问题。您能否将您的汽车模式转换为例如 ArrayType(String) 并调整您的 UDAF 以使用它?然后你可以在其中包含可变数量的元素。 我想出了一种方法来处理它,通过初始化传递架构。 【参考方案1】:可以在初始化 Udaf 类时动态传递模式 -
val yetAnotherUdaf = new YetAnotherUdaf(schema)
case class YetAnotherUdaf(schema:StructType) extends UserDefinedAggregateFunction
override def deterministic:Boolean=true
override def dataType:DataType=schema
override def inputSchema:StructType=schema
override def bufferSchema:StructType=schema
override def initialize(buffer:MutableAggregationBuffer):Unit= ???
override def update(buffer:MutableAggregationBuffer, input:Row):Unit= ???
override def merge(buffer1:MutableAggregationBuffer, buffer2:Row):Unit=???
override def evaluate(buffer:Row):StructType= ???
【讨论】:
以上是关于Spark UDAF 动态输入模式处理的主要内容,如果未能解决你的问题,请参考以下文章
使用 ArrayType 作为 bufferSchema 的 Spark UDAF 性能问题