UDF 转换 map<bigint,struct<in1:bigint,in2:string>> 列以向内部结构添加更多字段
Posted
技术标签:
【中文标题】UDF 转换 map<bigint,struct<in1:bigint,in2:string>> 列以向内部结构添加更多字段【英文标题】:UDF to cast a map<bigint,struct<in1:bigint,in2:string>> column to add more fields to inner struct 【发布时间】:2020-07-29 16:42:53 【问题描述】:我有一个 hive 表,当读入 spark 为 spark.table(<table_name>)
具有以下结构时:
scala> df.printSchema
root
|-- id: long (nullable = true)
|-- info: map (nullable = true)
| |-- key: long
| |-- value: struct (valueContainsNull = true)
| | |-- in1: long (nullable = true)
| | |-- in2: string (nullable = true)
我想转换映射列以向内部结构添加更多字段,例如in3,in4
在这个例子中:map<bigint,struct<in1:bigint,in2:string,in3:decimal(18,5),in4:string>>
我尝试过正常的演员阵容,但这不起作用。所以我正在检查是否可以通过 UDF 实现这一目标。
我将为这些新值分配默认值,例如十进制的 0 和字符串的 ""。
以下是尝试过但无法正常工作的方法。谁能建议我如何实现这一目标?
val origStructType = new StructType().add("in1", LongType, nullable = true).add("in2", StringType, nullable = true)
val newStructType = origStructType.add("in1", LongType, nullable = true).add("in2", StringType, nullable = true).add("in3", DecimalType(18,5), nullable = true).add("in4", StringType, nullable = true)
val newColSchema = MapType(LongType, newStructType)
val m = Map(101L->(101L,"val2"),102L->(102L,"val3"))
val df = Seq((100L,m)).toDF("id","info")
val typeUDFNewRet = udf((col1: Map[Long,Seq[(Long,String)]]) =>
col1.mapValues(v => Seq(v(0),v(1),null,"")) //Forced to use null here for another issue
, newColSchema)
spark.udf.register("typeUDFNewRet",typeUDFNewRet)
df.registerTempTable("op1")
val df2 = spark.sql("select id, typeUDFNewRet(info) from op1")
scala> val df2 = spark.sql("select id, typeUDFNewRet(info) from op1")
df2: org.apache.spark.sql.DataFrame = [id: bigint, UDF(info): map<bigint,struct<in1:bigint,in2:string,in1:bigint,in2:string,in3:decimal(18,5),in4:string>>]
scala> df2.show(false)
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.collection.Seq
at $anonfun$1$$anonfun$apply$1.apply(<console>:43)
at scala.collection.MapLike$MappedValues$$anonfun$iterato
我也尝试使用 this answer 作为 Row 返回,但这会产生差异问题。
【问题讨论】:
【参考方案1】:试试这个-
val origStructType = new StructType().add("in1", LongType, nullable = true).add("in2", StringType, nullable = true)
val newStructType = origStructType.add("in3", DecimalType(18,5), nullable = true).add("in4", StringType, nullable = true)
val newColSchema = MapType(LongType, newStructType)
val m = Map(101L->(101L,"val2"),102L->(102L,"val3"))
val df = Seq((100L,m)).toDF("id","info")
df.show(false)
df.printSchema()
val typeUDFNewRet = udf((col1: Map[Long,Row]) =>
col1.mapValues(r => Row.merge(r, Row(null, ""))) //Forced to use null here for another issue
, newColSchema)
spark.udf.register("typeUDFNewRet",typeUDFNewRet)
df.registerTempTable("op1")
val df2 = spark.sql("select id, typeUDFNewRet(info) from op1")
df2.show(false)
df2.printSchema()
/**
* +---+----------------------------------------------+
* |id |UDF(info) |
* +---+----------------------------------------------+
* |100|[101 -> [101, val2,, ], 102 -> [102, val3,, ]]|
* +---+----------------------------------------------+
*
* root
* |-- id: long (nullable = false)
* |-- UDF(info): map (nullable = true)
* | |-- key: long
* | |-- value: struct (valueContainsNull = true)
* | | |-- in1: long (nullable = true)
* | | |-- in2: string (nullable = true)
* | | |-- in3: decimal(18,5) (nullable = true)
* | | |-- in4: string (nullable = true)
*/
【讨论】:
感谢@Somehwar,这很好用。但是,如果我在 UDF 中使用非空/空白十进制值,例如col1.mapValues(r => Row.merge(r, Row(0.001, "hello")))
它会给出异常 scala.MatchError: 0.001 (of class java.lang.Double)
。知道如何解决吗?
好吧,我可以使用Decimal(0.001)
让它工作。所以我标记为已回答。谢谢。
是的,使用new BigDecimal(..)
表示十进制。如果有帮助,请随时投票+接受以上是关于UDF 转换 map<bigint,struct<in1:bigint,in2:string>> 列以向内部结构添加更多字段的主要内容,如果未能解决你的问题,请参考以下文章
Spark - 如何在 Seq[Map<String,String>] 中的单个字段上应用 udf
编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java
编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java
Hive Generic UDF:Hive 未按预期进行转换,原因是:java.lang.ClassCastException:java.util.ArrayList 无法转换为 java.util.