定义一个接受 Spark DataFrame 中对象数组的 UDF?

Posted

技术标签:

【中文标题】定义一个接受 Spark DataFrame 中对象数组的 UDF?【英文标题】:Defining a UDF that accepts an Array of objects in a Spark DataFrame? 【发布时间】:2016-08-17 21:11:23 【问题描述】:

使用 Spark 的 DataFrame 时,需要使用用户定义函数 (UDF) 来映射列中的数据。 UDF 要求显式指定参数类型。就我而言,我需要操作由对象数组组成的列,但我不知道要使用什么类型。这是一个例子:

import sqlContext.implicits._

// Start with some data. Each row (here, there's only one row) 
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
  """
  |
  |  "topic" : "pets",
  |  "subjects" : [
  |    "type" : "cat", "score" : 10,
  |    "type" : "dog", "score" : 1
  |  ]
  |
  """)))

使用内置的org.apache.spark.sql.functions对列中的数据进行基本操作比较简单

import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show

+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets|             2|
+-----+--------------+

编写自定义 UDF 来执行任意操作通常很容易

import org.apache.spark.sql.functions.udf
val enhance = udf  topic : String => topic.toUpperCase() 
data.select(enhance($"topic"), size($"subjects")).show 

+----------+--------------+
|UDF(topic)|size(subjects)|
+----------+--------------+
|      PETS|             2|
+----------+--------------+

但是,如果我想使用 UDF 来操作“主题”列中的对象数组怎么办?我对 UDF 中的参数使用什么类型?比如我想重新实现size函数,而不是使用spark提供的那个:

val my_size = udf  subjects: Array[Something] => subjects.size 
data.select($"topic", my_size($"subjects")).show

显然Array[Something] 不起作用...我应该使用什么类型!?我应该完全放弃Array[] 吗?闲逛告诉我scala.collection.mutable.WrappedArray 可能与它有关,但我仍然需要提供另一种类型。

【问题讨论】:

【参考方案1】:

你要找的是Seq[o.a.s.sql.Row]:

import org.apache.spark.sql.Row

val my_size = udf  subjects: Seq[Row] => subjects.size 

解释

ArrayType 的当前表示形式,如您所知,WrappedArray 所以Array 将不起作用,最好保持安全。 According to the official specification,StructType 的本地(外部)类型是 Row。不幸的是,这意味着对各个字段的访问不是类型安全的。

注意事项

要在 Spark struct,传递给 udf 的函数必须返回 Product 类型(Tuple*case class),而不是 Row。那是因为对应的udf变体depend on Scala reflection:

n 个参数的 Scala 闭包定义为用户定义函数 (UDF)。数据类型是根据 Scala 闭包的签名自动推断出来的。

在 Spark >= 2.3 中,可以直接返回 Row,as long as the schema is provided。

def udf(f: AnyRef, dataType: DataType): UserDefinedFunction 使用 Scala 闭包定义确定性用户定义函数 (UDF)。对于这个变体,调用者必须指定输出数据类型,并且没有自动输入类型强制。

参见例如How to create a Spark UDF in Java / Kotlin which returns a complex type?。

【讨论】:

以上是关于定义一个接受 Spark DataFrame 中对象数组的 UDF?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 中对嵌套的 Dataframe 进行平面映射

如何在 Spark 中对 DataFrame 进行分区和写入而不删除没有新数据的分区?

我可以将 Pyspark RDD 用作 Pandas DataFrame 吗? Pyspark/spark 在数据分析中对 Pandas 的限制?

以编程方式将几列添加到 Spark DataFrame

是否可以在 Pyspark 中对 DataFrame 进行子类化?

如何在 spark sql 中对数组进行成员操作?