在 Spark 数据帧 udf 中,像 struct(col1,col2) 这样的函数参数的类型是啥?

Posted

技术标签:

【中文标题】在 Spark 数据帧 udf 中,像 struct(col1,col2) 这样的函数参数的类型是啥?【英文标题】:In Spark dataframe udf, what is the type of function parameters which like struct(col1,col2)?在 Spark 数据帧 udf 中,像 struct(col1,col2) 这样的函数参数的类型是什么? 【发布时间】:2018-03-10 08:49:36 【问题描述】:

背景:

我有一个包含三列的数据框:id, x, y。 x,y 是双倍的。

首先我struct (col("x"),col("y"))获取坐标列。 然后groupBy(col("id"))agg(collect_list(col("coordinate")))

所以现在 df 只有两列:id ,coordinate

我认为坐标的数据类型是collection.mutable.WrappedArray[(Double,Double)]。 所以我把它传给了udf。但是,数据类型是错误的。运行代码时出现错误。我不知道为什么。 struct(col1,col2) 的真正数据类型是什么?或者有没有其他方法可以轻松得到正确答案?

这是代码:

def getMedianPoint = udf((array1: collection.mutable.WrappedArray[(Double,Double)]) =>   
    var l = (array1.length/2)
    var c = array1(l)
    val x = c._1.asInstanceOf[Double]
    val y = c._2.asInstanceOf[Double]
    (x,y)
)

df.withColumn("coordinate",struct(col("x"),col("y")))
  .groupBy(col("id"))
  .agg(collect_list("coordinate").as("coordinate")
  .withColumn("median",getMedianPoint(col("coordinate")))

非常感谢!

【问题讨论】:

所以数据类型应该是 Seq[Row] 基于 Ramesh Maharjan 的回答。 【参考方案1】:

我认为坐标的数据类型是collection.mutable.WrappedArray[(Double,Double)]

是的,你说的完全正确您在 udf 函数中定义为 dataTypes 的内容以及您作为参数传递的内容也是正确的。但是主要问题是结构列的键名。因为你一定有以下问题

由于数据类型不匹配,无法解析 'UDF(coordinate)':参数 1 需要 array> 类型,但是,'coordinate' 属于 array> 类型。;;

只需使用alias 将结构键重命名为

,错误就会消失
df.withColumn("coordinate",struct(col("x").as("_1"),col("y").as("_2")))
  .groupBy(col("id"))
  .agg(collect_list("coordinate").as("coordinate"))
    .withColumn("median",getMedianPoint(col("coordinate")))

以便键名匹配。

但是

这将引发另一个问题

  var c = array1(l)

原因:java.lang.ClassCastException:org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 无法转换为 scala.Tuple2

所以我建议你把udf函数改成

import org.apache.spark.sql.functions._

def getMedianPoint = udf((array1: Seq[Row]) => 
  var l = (array1.length/2)
  (array1(l)(0).asInstanceOf[Double], array1(l)(1).asInstanceOf[Double])
)

这样您甚至都不需要使用alias。所以完整的解决方案是

import org.apache.spark.sql.functions._

def getMedianPoint = udf((array1: Seq[Row]) => 
  var l = (array1.length/2)
  (array1(l)(0).asInstanceOf[Double], array1(l)(1).asInstanceOf[Double])
)

df.withColumn("coordinate",struct(col("x"),col("y")))
  .groupBy(col("id"))
  .agg(collect_list("coordinate").as("coordinate"))
    .withColumn("median",getMedianPoint(col("coordinate")))
  .show(false)

希望回答对你有帮助

【讨论】:

是的,我试过了。但是它抛出了一个新异常Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2 如果答案对您有帮助,请考虑接受它:) 对不起,对不起。只是有点忙。它非常有用。非常感谢!

以上是关于在 Spark 数据帧 udf 中,像 struct(col1,col2) 这样的函数参数的类型是啥?的主要内容,如果未能解决你的问题,请参考以下文章

我可以将 spark 数据帧作为参数发送给 pandas UDF

Apache Spark - 注册 UDF - 返回数据帧

如何从 Spark 数据帧中的 When 子句将多个列发送到 udf?

Scala中的Spark分组映射UDF

如何从Spark数据帧中的When子句向udf发送多个列?

将 Python UDF 应用于 Spark 数据帧时出现 java.lang.IllegalArgumentException