Spark SQL UDF 使用 df.WithColumn() 返回 scala 不可变映射

Posted

技术标签:

【中文标题】Spark SQL UDF 使用 df.WithColumn() 返回 scala 不可变映射【英文标题】:Spark SQL UDF returning scala immutable Map with df.WithColumn() 【发布时间】:2016-07-21 14:44:47 【问题描述】:

我有案例课

case class MyCaseClass(City : String, Extras : Map[String, String])

和返回 scala.collection.immutable.Map 的用户定义函数

def extrasUdf = spark.udf.register(
   "extras_udf", 
   (age : Int, name : String) => Map("age" -> age.toString, "name" -> name)
)

但这会因异常而中断:

import spark.implicits._

spark.read.options(...).load(...)
      .select('City, 'Age, 'Name)
      .withColumn("Extras", extrasUdf('Age, 'Name))
      .drop('Age)
      .drop('Name)
      .as[MyCaseClass]

我应该使用 spark sql 的 MapType(DataTypes.StringType, DataTypes.IntegerType) 但我找不到任何工作示例...

如果我使用 scala.collection.Map 但我需要不可变的 Map

【问题讨论】:

【参考方案1】:

你的代码有很多问题:

您正在使用def extrastUdf =,它创建了一个用于注册 UDF 的函数,而不是实际创建/注册 UDF。请改用val extrasUdf =

1234563 Spark 不支持Any。您至少可以做两件事:(a) 切换到使用字符串映射(使用 age.toString,在这种情况下您不需要 UDF,因为您可以简单地使用 map())或 (b) 切换到使用命名使用named_struct() 的结构(同样,不需要UDF)。通常,仅当您无法对现有函数执行所需的操作时,才编写 UDF。我更喜欢查看 Hive 文档,因为 Spark 文档相当稀疏。

另外,请记住,Spark 模式中的类型规范(例如,MapType)与 Scala 类型(例如,Map[_, _])完全不同,并且与类型在内部表示以及在 Scala 和火花数据结构。换句话说,这与可变集合与不可变集合无关。

希望这会有所帮助!

【讨论】:

是否可以在没有/使用 UDF 的情况下放入 WithColumn() 表达式,以便仅当它们不为空时才将 'Age、'Name 放入地图中,因为我不想在地图中有空值? 您可以使用if()case when ... then 表达式。

以上是关于Spark SQL UDF 使用 df.WithColumn() 返回 scala 不可变映射的主要内容,如果未能解决你的问题,请参考以下文章

在数据框 API 中使用 spark SQL udf

Spark SQL:如何使用 JAVA 从 DataFrame 操作中调用 UDF

如何在 Spark SQL(DataFrame)的 UDF 中使用常量值

Spark - Hive UDF 与 Spark-SQL 一起使用,但不与 DataFrame 一起使用

尝试从 UDF 执行 spark sql 查询

使用 scala 在 spark sql 中编写 UDF