如何使用 udf 将空列添加到 Spark 中的复杂数组结构
Posted
技术标签:
【中文标题】如何使用 udf 将空列添加到 Spark 中的复杂数组结构【英文标题】:How to add null columns to complex array struct in Spark with a udf 【发布时间】:2019-07-08 21:20:40 【问题描述】:我正在尝试将空列添加到嵌入 array[struct] 列,通过这种方式我将能够转换一个类似的复杂列:
case class Additional(id: String, item_value: String)
case class Element(income:String,currency:String,additional: Additional)
case class Additional2(id: String, item_value: String, extra2: String)
case class Element2(income:String,currency:String,additional: Additional2)
val my_uDF = fx.udf((data: Seq[Element]) =>
data.map(x=>new Element2(x.income,x.currency,new Additional2(x.additional.id,x.additional.item_value,null))).seq
)
sparkSession.sqlContext.udf.register("transformElements",my_uDF)
val result=sparkSession.sqlContext.sql("select transformElements(myElements),line_number,country,idate from entity where line_number='1'")
目标是向 Element.Additional 添加一个名为 extra2 的额外字段,因此我将此字段映射到 UDF,但它失败了,因为:
org.apache.spark.SparkException: Failed to execute user defined function(anonfun$1: (array<struct<income:string,currency:string,additional:struct<id:string,item_value:string>>>) => array<struct<income:string,currency:string,additional:struct<id:string,item_value:string,extra2:string>>>)
如果我打印“元素”字段显示的架构:
|-- myElements: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- income: string (nullable = true)
| | |-- currency: string (nullable = true)
| | |-- additional: struct (nullable = true)
| | | |-- id: string (nullable = true)
| | | |-- item_value: string (nullable = true)
我正在尝试转换成这个模式:
|-- myElements: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- income: string (nullable = true)
| | |-- currency: string (nullable = true)
| | |-- additional: struct (nullable = true)
| | | |-- id: string (nullable = true)
| | | |-- item_value: string (nullable = true)
| | | |-- extra2: string (nullable = true)
【问题讨论】:
你可能没有在你的 DF 上使用 .withcolumn 吗?为什么要走UDF和案例类的路线? 因为我从一个表开始这是另一种方法,它利用数据集而不是数据框来实现对对象的直接访问,而不是使用 Row。还有一种称为asElement2
的额外方法将Element
转换为Element2
。
case class Additional2(id: String, item_value: String, extra2: String)
case class Element2(income: String, currency: String, additional2: Additional2)
case class Additional(id: String, item_value: String)
case class Element(income:String, currency:String, additional: Additional)
def asElement2(): Element2 =
val additional2 = Additional2(additional.id, additional.item_value, null)
Element2(income, currency, additional2)
val df = Seq(
(Seq(Element("150000", "EUR", Additional("001", "500EUR")))),
(Seq(Element("50000", "CHF", Additional("002", "1000CHF"))))
).toDS()
df.map
se => se.map_.asElement2
//or even simpler
df.map_.map_.asElement2
输出:
+-------------------------------+
|value |
+-------------------------------+
|[[150000, EUR, [001, 500EUR,]]]|
|[[50000, CHF, [002, 1000CHF,]]]|
+-------------------------------+
最终架构:
root
|-- value: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- income: string (nullable = true)
| | |-- currency: string (nullable = true)
| | |-- additional2: struct (nullable = true)
| | | |-- id: string (nullable = true)
| | | |-- item_value: string (nullable = true)
| | | |-- extra2: string (nullable = true)
【讨论】:
我喜欢你的回答。对我来说,发生了什么更清楚了。谢谢!【参考方案2】:使用map
对DataFrame 中的嵌套行元素进行必要的转换并通过toDF
重命名列更容易:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._
case class Additional(id: String, item_value: String)
case class Element(income: String, currency: String, additional: Additional)
case class Additional2(id: String, item_value: String, extra2: String)
case class Element2(income: String, currency: String, additional: Additional2)
val df = Seq(
(Seq(Element("70k", "US", Additional("1", "101")), Element("90k", "US", Additional("2", "202")))),
(Seq(Element("80k", "US", Additional("3", "303"))))
).toDF("myElements")
val df2 = df.map case Row(s: Seq[Row] @unchecked) => s.map
case Row(income: String, currency: String, additional: Row) => additional match
case Row(id: String, item_value: String) =>
Element2(income, currency, Additional2(id, item_value, null))
.toDF("myElements")
df2.show(false)
// +--------------------------------------------+
// |myElements |
// +--------------------------------------------+
// |[[70k, US, [1, 101,]], [90k, US, [2, 202,]]]|
// |[[80k, US, [3, 303,]]] |
// +--------------------------------------------+
df2.printSchema
// root
// |-- myElements: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- income: string (nullable = true)
// | | |-- currency: string (nullable = true)
// | | |-- additional: struct (nullable = true)
// | | | |-- id: string (nullable = true)
// | | | |-- item_value: string (nullable = true)
// | | | |-- extra2: string (nullable = true)
如果出于某种原因首选 UDF,则所需的转换基本相同:
val myUDF = udf((s: Seq[Row]) => s.map
case Row(income: String, currency: String, additional: Row) => additional match
case Row(id: String, item_value: String) =>
Element2(income, currency, Additional2(id, item_value, null))
)
val df2 = df.select(myUDF($"myElements").as("myElements"))
【讨论】:
以上是关于如何使用 udf 将空列添加到 Spark 中的复杂数组结构的主要内容,如果未能解决你的问题,请参考以下文章
如何从 Spark 数据帧中的 When 子句将多个列发送到 udf?