UDF 转换 map<bigint,struct<in1:bigint,in2:string>> 列以向内部结构添加更多字段

Posted

技术标签:

【中文标题】UDF 转换 map<bigint,struct<in1:bigint,in2:string>> 列以向内部结构添加更多字段【英文标题】:UDF to cast a map<bigint,struct<in1:bigint,in2:string>> column to add more fields to inner struct 【发布时间】:2020-07-29 16:42:53 【问题描述】:

我有一个 hive 表,当读入 spark 为 spark.table(&lt;table_name&gt;) 具有以下结构时:

scala> df.printSchema
root
 |-- id: long (nullable = true)
 |-- info: map (nullable = true)
 |    |-- key: long
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- in1: long (nullable = true)
 |    |    |-- in2: string (nullable = true)

我想转换映射列以向内部结构添加更多字段,例如in3,in4 在这个例子中:map&lt;bigint,struct&lt;in1:bigint,in2:string,in3:decimal(18,5),in4:string&gt;&gt; 我尝试过正常的演员阵容,但这不起作用。所以我正在检查是否可以通过 UDF 实现这一目标。 我将为这些新值分配默认值,例如十进制的 0 和字符串的 ""。 以下是尝试过但无法正常工作的方法。谁能建议我如何实现这一目标?

val origStructType = new StructType().add("in1", LongType, nullable = true).add("in2", StringType, nullable = true)
val newStructType = origStructType.add("in1", LongType, nullable = true).add("in2", StringType, nullable = true).add("in3", DecimalType(18,5), nullable = true).add("in4", StringType, nullable = true)
val newColSchema = MapType(LongType, newStructType)      

val m = Map(101L->(101L,"val2"),102L->(102L,"val3"))
val df = Seq((100L,m)).toDF("id","info")
val typeUDFNewRet = udf((col1: Map[Long,Seq[(Long,String)]]) => 
    col1.mapValues(v => Seq(v(0),v(1),null,"")) //Forced to use null here for another issue
  , newColSchema)
spark.udf.register("typeUDFNewRet",typeUDFNewRet)
df.registerTempTable("op1")
val df2 = spark.sql("select id, typeUDFNewRet(info) from op1")
scala> val df2 = spark.sql("select id, typeUDFNewRet(info) from op1")
df2: org.apache.spark.sql.DataFrame = [id: bigint, UDF(info): map<bigint,struct<in1:bigint,in2:string,in1:bigint,in2:string,in3:decimal(18,5),in4:string>>]

scala> df2.show(false)
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.collection.Seq
  at $anonfun$1$$anonfun$apply$1.apply(<console>:43)
  at scala.collection.MapLike$MappedValues$$anonfun$iterato

我也尝试使用 this answer 作为 Row 返回,但这会产生差异问题。

【问题讨论】:

【参考方案1】:

试试这个-

 val origStructType = new StructType().add("in1", LongType, nullable = true).add("in2", StringType, nullable = true)
    val newStructType = origStructType.add("in3", DecimalType(18,5), nullable = true).add("in4", StringType, nullable = true)
    val newColSchema = MapType(LongType, newStructType)

    val m = Map(101L->(101L,"val2"),102L->(102L,"val3"))
    val df = Seq((100L,m)).toDF("id","info")
    df.show(false)
    df.printSchema()
    val typeUDFNewRet = udf((col1: Map[Long,Row]) => 
      col1.mapValues(r => Row.merge(r, Row(null, ""))) //Forced to use null here for another issue
    , newColSchema)
    spark.udf.register("typeUDFNewRet",typeUDFNewRet)
    df.registerTempTable("op1")
    val df2 = spark.sql("select id, typeUDFNewRet(info) from op1")

    df2.show(false)
    df2.printSchema()

    /**
      * +---+----------------------------------------------+
      * |id |UDF(info)                                     |
      * +---+----------------------------------------------+
      * |100|[101 -> [101, val2,, ], 102 -> [102, val3,, ]]|
      * +---+----------------------------------------------+
      *
      * root
      * |-- id: long (nullable = false)
      * |-- UDF(info): map (nullable = true)
      * |    |-- key: long
      * |    |-- value: struct (valueContainsNull = true)
      * |    |    |-- in1: long (nullable = true)
      * |    |    |-- in2: string (nullable = true)
      * |    |    |-- in3: decimal(18,5) (nullable = true)
      * |    |    |-- in4: string (nullable = true)
      */

【讨论】:

感谢@Somehwar,这很好用。但是,如果我在 UDF 中使用非空/空白十进制值,例如 col1.mapValues(r =&gt; Row.merge(r, Row(0.001, "hello"))) 它会给出异常 scala.MatchError: 0.001 (of class java.lang.Double) 。知道如何解决吗? 好吧,我可以使用Decimal(0.001) 让它工作。所以我标记为已回答。谢谢。 是的,使用new BigDecimal(..) 表示十进制。如果有帮助,请随时投票+接受

以上是关于UDF 转换 map<bigint,struct<in1:bigint,in2:string>> 列以向内部结构添加更多字段的主要内容,如果未能解决你的问题,请参考以下文章

Spark - 如何在 Seq[Map<String,String>] 中的单个字段上应用 udf

在 Hadoop UDF 输出中保留列数据类型(流)

编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java

编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java

Hive Generic UDF:Hive 未按预期进行转换,原因是:java.lang.ClassCastException:java.util.ArrayList 无法转换为 java.util.

在不使用 UDF 的情况下基于映射转换 Spark DataFrame 中的列