Spark SQL UDF 使用 df.WithColumn() 返回 scala 不可变映射
Posted
技术标签:
【中文标题】Spark SQL UDF 使用 df.WithColumn() 返回 scala 不可变映射【英文标题】:Spark SQL UDF returning scala immutable Map with df.WithColumn() 【发布时间】:2016-07-21 14:44:47 【问题描述】:我有案例课
case class MyCaseClass(City : String, Extras : Map[String, String])
和返回 scala.collection.immutable.Map 的用户定义函数
def extrasUdf = spark.udf.register(
"extras_udf",
(age : Int, name : String) => Map("age" -> age.toString, "name" -> name)
)
但这会因异常而中断:
import spark.implicits._
spark.read.options(...).load(...)
.select('City, 'Age, 'Name)
.withColumn("Extras", extrasUdf('Age, 'Name))
.drop('Age)
.drop('Name)
.as[MyCaseClass]
我应该使用 spark sql 的 MapType(DataTypes.StringType, DataTypes.IntegerType) 但我找不到任何工作示例...
如果我使用 scala.collection.Map 但我需要不可变的 Map
【问题讨论】:
【参考方案1】:你的代码有很多问题:
您正在使用def extrastUdf =
,它创建了一个用于注册 UDF 的函数,而不是实际创建/注册 UDF。请改用val extrasUdf =
。
Any
。您至少可以做两件事:(a) 切换到使用字符串映射(使用 age.toString
,在这种情况下您不需要 UDF,因为您可以简单地使用 map()
)或 (b) 切换到使用命名使用named_struct()
的结构(同样,不需要UDF)。通常,仅当您无法对现有函数执行所需的操作时,才编写 UDF。我更喜欢查看 Hive 文档,因为 Spark 文档相当稀疏。
另外,请记住,Spark 模式中的类型规范(例如,MapType
)与 Scala 类型(例如,Map[_, _]
)完全不同,并且与类型在内部表示以及在 Scala 和火花数据结构。换句话说,这与可变集合与不可变集合无关。
希望这会有所帮助!
【讨论】:
是否可以在没有/使用 UDF 的情况下放入 WithColumn() 表达式,以便仅当它们不为空时才将 'Age、'Name 放入地图中,因为我不想在地图中有空值? 您可以使用if()
或case when ... then
表达式。以上是关于Spark SQL UDF 使用 df.WithColumn() 返回 scala 不可变映射的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL:如何使用 JAVA 从 DataFrame 操作中调用 UDF
如何在 Spark SQL(DataFrame)的 UDF 中使用常量值