在 spark 数据框中运行 UDF 时,不支持获取 org.apache.spark.sql.Column 类型的架构

Posted

技术标签:

【中文标题】在 spark 数据框中运行 UDF 时,不支持获取 org.apache.spark.sql.Column 类型的架构【英文标题】:getting Schema for type org.apache.spark.sql.Column is not supported while running UDF in spark dataframe 【发布时间】:2019-12-23 17:34:58 【问题描述】:

我正在尝试连接 spark 数据框中的列数组,我正在通过 spark scala UDF 接收列数组。

这是我的代码:

val aaa = Map(("00","DELHI") -> (List("key1","key2","key3"),List("a")))

   val sampleDf = sparksession.createDataFrame(
      List(("00", "DELHI", "111", "222", "333"), ("00", "SP", "123123123", "231231231", "312312312")
      )).toDF("RecordType", "CITY", "key1", "key2", "key3")  //.printSchema() //.show(100,false)

    val test2 = sampleDf.withColumn("primayKEY",concat(getprimakey(aaa)(col("RecordType"),col("CITY")))).show()//.printSchema()//show(false)


  def getprimakey (mapconfig: Map[(String, String), (List[String], List[String])]) =  udf((rec:String ,layout:String) => 
    println(rec+""+layout)
    val s = mapconfig(rec,layout)._1.map(x => col(x)).toArray//.map(x => col(x))
    s
  )

下面是我遇到的错误

Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Column is not supported
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:733)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:693)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:671)
    at org.apache.spark.sql.functions$.udf(functions.scala:3088)
    at com.rupesh.TEST_UDF$.getprimakey(TEST_UDF.scala:29)
    at com.rupesh.TEST_UDF$.main(TEST_UDF.scala:19)
    at com.rupesh.TEST_UDF.main(TEST_UDF.scala)

【问题讨论】:

@blackbishop 对此有何解决方案? 您正试图从您的 UDF 返回一个列数组。那是你的问题。 【参考方案1】:

您只能访问您传递给 UDF 的 UDF 中的字段。因此,您的逻辑需要整行,这可以通过传递 struct("*") 来完成:

def getprimakey(mapconfig: Map[(String, String), (List[String], List[String])]) = udf((rec: String, layout: String, entireRow:Row) => 
  mapconfig.get(rec,layout).map(_._1)
  .map(k => k.map(entireRow.getAs[String](_)))
  .map(_.mkString)
)

sampleDf.withColumn("primayKEY", getprimakey(aaa)(col("RecordType"), col("CITY"), struct("*"))).show() 

+----------+-----+---------+---------+---------+---------+
|RecordType| CITY|     key1|     key2|     key3|primayKEY|
+----------+-----+---------+---------+---------+---------+
|        00|DELHI|      111|      222|      333|111222333|
|        00|   SP|123123123|231231231|312312312|     null|
+----------+-----+---------+---------+---------+---------+

【讨论】:

谢谢 Roth,你能否详细说明第二行 (.map(k => k.map(entireRow.getAs[String](_))) 没有得到这个。 @rupeshkumar 第一个映射在Option 上,第二个映射迭代包含主键列名的Seq[String],然后使用@987654326 从entireRow 中提取这些字段Row上的@方法@

以上是关于在 spark 数据框中运行 UDF 时,不支持获取 org.apache.spark.sql.Column 类型的架构的主要内容,如果未能解决你的问题,请参考以下文章

注册 UDF 时出现 Spark 错误:不支持 AnyRef 类型的架构

Scala Spark 中的 udf 运行时错误

使用 UDF 及其性能的 Spark Scala 数据集验证

在不使用 UDF 的情况下基于映射转换 Spark DataFrame 中的列

Scala Spark - 不支持 udf 列

使用 udf 以编程方式从数据框中选择列