定义一个接受 Spark DataFrame 中对象数组的 UDF?
Posted
技术标签:
【中文标题】定义一个接受 Spark DataFrame 中对象数组的 UDF?【英文标题】:Defining a UDF that accepts an Array of objects in a Spark DataFrame? 【发布时间】:2016-08-17 21:11:23 【问题描述】:使用 Spark 的 DataFrame 时,需要使用用户定义函数 (UDF) 来映射列中的数据。 UDF 要求显式指定参数类型。就我而言,我需要操作由对象数组组成的列,但我不知道要使用什么类型。这是一个例子:
import sqlContext.implicits._
// Start with some data. Each row (here, there's only one row)
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
"""
|
| "topic" : "pets",
| "subjects" : [
| "type" : "cat", "score" : 10,
| "type" : "dog", "score" : 1
| ]
|
""")))
使用内置的org.apache.spark.sql.functions
对列中的数据进行基本操作比较简单
import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show
+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets| 2|
+-----+--------------+
编写自定义 UDF 来执行任意操作通常很容易
import org.apache.spark.sql.functions.udf
val enhance = udf topic : String => topic.toUpperCase()
data.select(enhance($"topic"), size($"subjects")).show
+----------+--------------+
|UDF(topic)|size(subjects)|
+----------+--------------+
| PETS| 2|
+----------+--------------+
但是,如果我想使用 UDF 来操作“主题”列中的对象数组怎么办?我对 UDF 中的参数使用什么类型?比如我想重新实现size函数,而不是使用spark提供的那个:
val my_size = udf subjects: Array[Something] => subjects.size
data.select($"topic", my_size($"subjects")).show
显然Array[Something]
不起作用...我应该使用什么类型!?我应该完全放弃Array[]
吗?闲逛告诉我scala.collection.mutable.WrappedArray
可能与它有关,但我仍然需要提供另一种类型。
【问题讨论】:
【参考方案1】:你要找的是Seq[o.a.s.sql.Row]
:
import org.apache.spark.sql.Row
val my_size = udf subjects: Seq[Row] => subjects.size
解释:
ArrayType
的当前表示形式,如您所知,WrappedArray
所以Array
将不起作用,最好保持安全。
According to the official specification,StructType
的本地(外部)类型是 Row
。不幸的是,这意味着对各个字段的访问不是类型安全的。
注意事项:
要在 Spark struct,传递给 udf
的函数必须返回 Product
类型(Tuple*
或 case class
),而不是 Row
。那是因为对应的udf
变体depend on Scala reflection:
将 n 个参数的 Scala 闭包定义为用户定义函数 (UDF)。数据类型是根据 Scala 闭包的签名自动推断出来的。
在 Spark >= 2.3 中,可以直接返回 Row
,as long as the schema is provided。
def udf(f: AnyRef, dataType: DataType): UserDefinedFunction
使用 Scala 闭包定义确定性用户定义函数 (UDF)。对于这个变体,调用者必须指定输出数据类型,并且没有自动输入类型强制。
参见例如How to create a Spark UDF in Java / Kotlin which returns a complex type?。
【讨论】:
以上是关于定义一个接受 Spark DataFrame 中对象数组的 UDF?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark 中对嵌套的 Dataframe 进行平面映射
如何在 Spark 中对 DataFrame 进行分区和写入而不删除没有新数据的分区?
我可以将 Pyspark RDD 用作 Pandas DataFrame 吗? Pyspark/spark 在数据分析中对 Pandas 的限制?