Spark DataFrame UDF 分区列
Posted
技术标签:
【中文标题】Spark DataFrame UDF 分区列【英文标题】:Spark DataFrame UDF Partitioning Columns 【发布时间】:2017-09-10 16:28:49 【问题描述】:我想转换一列。新列应该只包含原始列的一个分区。我定义了以下udf:
def extract (index : Integer) = udf((v: Seq[Double]) => v.grouped(16).toSeq(index))
稍后在循环中使用它
myDF = myDF.withColumn("measurement_"+i,extract(i)($"vector"))
原始向量列是通过以下方式创建的:
var vectors :Seq[Seq[Double]] = myVectors
vectors.toDF("vector")
但最后我得到以下错误:
Failed to execute user defined function(anonfun$user$sparkapp$MyClass$$extract$2$1: (array<double>) => array<double>)
我是否错误地定义了 udf?
【问题讨论】:
【参考方案1】:当我尝试提取不存在的元素时,我可以重现错误,即给出大于序列长度的索引:
val myDF = Seq(Seq(1.0, 2.0 ,3, 4.0), Seq(4.0,3,2,1)).toDF("vector")
myDF: org.apache.spark.sql.DataFrame = [vector: array<double>]
def extract (index : Integer) = udf((v: Seq[Double]) => v.grouped(2).toSeq(index))
// extract: (index: Integer)org.apache.spark.sql.expressions.UserDefinedFunction
val i = 2
myDF.withColumn("measurement_"+i,extract(i)($"vector")).show
给出这个错误:
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$extract$1: (array<double>) => array<double>)
您很可能在执行toSeq(index)
时遇到同样的问题,请尝试使用 toSeq.lift(index)
,如果索引超出范围,则返回 None:
def extract (index : Integer) = udf((v: Seq[Double]) => v.grouped(2).toSeq.lift(index))
extract: (index: Integer)org.apache.spark.sql.expressions.UserDefinedFunction
正常索引:
val i = 1
myDF.withColumn("measurement_"+i,extract(i)($"vector")).show
+--------------------+-------------+
| vector|measurement_1|
+--------------------+-------------+
|[1.0, 2.0, 3.0, 4.0]| [3.0, 4.0]|
|[4.0, 3.0, 2.0, 1.0]| [2.0, 1.0]|
+--------------------+-------------+
索引超出范围:
val i = 2
myDF.withColumn("measurement_"+i,extract(i)($"vector")).show
+--------------------+-------------+
| vector|measurement_2|
+--------------------+-------------+
|[1.0, 2.0, 3.0, 4.0]| null|
|[4.0, 3.0, 2.0, 1.0]| null|
+--------------------+-------------+
【讨论】:
非常感谢,这个错误的调试花费了我很多时间。为您的详细回答 +1!以上是关于Spark DataFrame UDF 分区列的主要内容,如果未能解决你的问题,请参考以下文章
定义一个接受 Spark DataFrame 中对象数组的 UDF?
定义一个接受 Spark DataFrame 中对象数组的 UDF?