Spark DataFrame UDF 分区列

Posted

技术标签:

【中文标题】Spark DataFrame UDF 分区列【英文标题】:Spark DataFrame UDF Partitioning Columns 【发布时间】:2017-09-10 16:28:49 【问题描述】:

我想转换一列。新列应该只包含原始列的一个分区。我定义了以下udf:

def extract (index : Integer) = udf((v: Seq[Double]) => v.grouped(16).toSeq(index))

稍后在循环中使用它

myDF = myDF.withColumn("measurement_"+i,extract(i)($"vector"))

原始向量列是通过以下方式创建的:

var vectors :Seq[Seq[Double]] = myVectors
vectors.toDF("vector")

但最后我得到以下错误:

Failed to execute user defined function(anonfun$user$sparkapp$MyClass$$extract$2$1: (array<double>) => array<double>)

我是否错误地定义了 udf?

【问题讨论】:

【参考方案1】:

当我尝试提取不存在的元素时,我可以重现错误,即给出大于序列长度的索引:

val myDF = Seq(Seq(1.0, 2.0 ,3, 4.0), Seq(4.0,3,2,1)).toDF("vector")
myDF: org.apache.spark.sql.DataFrame = [vector: array<double>]

def extract (index : Integer) = udf((v: Seq[Double]) => v.grouped(2).toSeq(index))
// extract: (index: Integer)org.apache.spark.sql.expressions.UserDefinedFunction

val i = 2

myDF.withColumn("measurement_"+i,extract(i)($"vector")).show

给出这个错误:

org.apache.spark.SparkException: Failed to execute user defined function($anonfun$extract$1: (array<double>) => array<double>)

您很可能在执行toSeq(index) 时遇到同样的问题,请尝试使用 toSeq.lift(index),如果索引超出范围,则返回 None:

def extract (index : Integer) = udf((v: Seq[Double]) => v.grouped(2).toSeq.lift(index))
extract: (index: Integer)org.apache.spark.sql.expressions.UserDefinedFunction

正常索引

val i = 1    
myDF.withColumn("measurement_"+i,extract(i)($"vector")).show
+--------------------+-------------+
|              vector|measurement_1|
+--------------------+-------------+
|[1.0, 2.0, 3.0, 4.0]|   [3.0, 4.0]|
|[4.0, 3.0, 2.0, 1.0]|   [2.0, 1.0]|
+--------------------+-------------+

索引超出范围

val i = 2
myDF.withColumn("measurement_"+i,extract(i)($"vector")).show
+--------------------+-------------+
|              vector|measurement_2|
+--------------------+-------------+
|[1.0, 2.0, 3.0, 4.0]|         null|
|[4.0, 3.0, 2.0, 1.0]|         null|
+--------------------+-------------+

【讨论】:

非常感谢,这个错误的调试花费了我很多时间。为您的详细回答 +1!

以上是关于Spark DataFrame UDF 分区列的主要内容,如果未能解决你的问题,请参考以下文章

定义一个接受 Spark DataFrame 中对象数组的 UDF?

定义一个接受 Spark DataFrame 中对象数组的 UDF?

Spark:保存按“虚拟”列分区的 DataFrame

在不使用 UDF 的情况下基于映射转换 Spark DataFrame 中的列

如何在 Spark SQL 中将额外参数传递给 UDF?

在 RDD 转换上保留 Spark DataFrame 列分区