在 Spark SQL 中使用 UDF 函数后,如何修复这种类型的错误?

Posted

技术标签:

【中文标题】在 Spark SQL 中使用 UDF 函数后,如何修复这种类型的错误?【英文标题】:How to fix this type error, after using UDF function in Spark SQL? 【发布时间】:2018-08-21 09:45:12 【问题描述】:

我想将我的features(type: sparse vector of ml.linalg) 分解为每个特征的索引和值,所以我做了以下事情:

def zipKeyValue(vec:linalg.Vector) : Array[(Int,Double)] = 
  val indice:Array[Int] = vec.toSparse.indices;
  val value:Array[Double] = vec.toSparse.values;
  indice.zip(value)

val udf1 = udf( zipKeyValue _)
val df1 = df.withColumn("features",udf1(col("features")));
val df2 = df1.withColumn("features",explode(col("features")) );   
val udf2 = udf( ( f:Tuple2[Int,Double]) => f._1.toString )   ;
val udf3 = udf( (f:Tuple2[Int,Double]) =>f._2) ;
val df3 = df2.withColumn("key",udf2(col("features"))).withColumn("value",udf3(col("features")));
df3.show();

但我得到了错误: Failed to execute user defined function(anonfun$38: (struct<_1:int,_2:double>) => string) Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2

我很困惑,因为我的函数zipKeyValue 返回一个Tuple2[(Int,Double)],但实际上我得到了一个struct&lt;_1:int,_2:double&gt;。我该如何解决?

【问题讨论】:

【参考方案1】:

这里不需要UDF。只需选择列。

df2
  .withColumn("key", col("features._1"))
  .withColumn("value", col("features._2"))

一般情况下,您应该使用Rows 而不是Tuples

import org.apache.spark.sql.Row

val udf2 = udf((f: Row) => f.getInt(0).toString)
val udf3 = udf((f: Row) => f.getDouble(1))

【讨论】:

以上是关于在 Spark SQL 中使用 UDF 函数后,如何修复这种类型的错误?的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL UDF示例

Spark SQL:如何使用 JAVA 从 DataFrame 操作中调用 UDF

无法在 spark sql 中注册 UDF

详解Spark sql用户自定义函数:UDF与UDAF

尝试从 UDF 执行 spark sql 查询

Scala 和 Spark UDF 函数