如何使用 udf 将空列添加到 Spark 中的复杂数组结构

Posted

技术标签:

【中文标题】如何使用 udf 将空列添加到 Spark 中的复杂数组结构【英文标题】:How to add null columns to complex array struct in Spark with a udf 【发布时间】:2019-07-08 21:20:40 【问题描述】:

我正在尝试将空列添加到嵌入 array[struct] 列,通过这种方式我将能够转换一个类似的复杂列:

  case class Additional(id: String, item_value: String)
  case class Element(income:String,currency:String,additional: Additional)
  case class Additional2(id: String, item_value: String, extra2: String)
  case class Element2(income:String,currency:String,additional: Additional2)

  val  my_uDF = fx.udf((data: Seq[Element]) => 
    data.map(x=>new Element2(x.income,x.currency,new Additional2(x.additional.id,x.additional.item_value,null))).seq
  )
  sparkSession.sqlContext.udf.register("transformElements",my_uDF)
  val result=sparkSession.sqlContext.sql("select transformElements(myElements),line_number,country,idate from entity where line_number='1'")

目标是向 Element.Additional 添加一个名为 extra2 的额外字段,因此我将此字段映射到 UDF,但它失败了,因为:

org.apache.spark.SparkException: Failed to execute user defined function(anonfun$1: (array<struct<income:string,currency:string,additional:struct<id:string,item_value:string>>>) => array<struct<income:string,currency:string,additional:struct<id:string,item_value:string,extra2:string>>>)

如果我打印“元素”字段显示的架构:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)

我正在尝试转换成这个模式:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)
 |    |    |    |-- extra2: string (nullable = true)

【问题讨论】:

你可能没有在你的 DF 上使用 .withcolumn 吗?为什么要走UDF和案例类的路线? 因为我从一个表开始 。 f2 仅从 2019 年开始具有此额外字段,并且仅在今年之前才通知 f1,因此我需要像这样进行联合:(从表中选择 f2,其中年份 >=2019 联合从表中选择 f1,其中年份 只是另一种方法。如果两个架构相同并且一个具有附加列。您可以阅读带有附加字段的第一个表。采用它的架构并使用该架构读取第二个表。 spark 将固有地将 null 用于在其没有任何数据的模式中定义的字段。为了速度,我会尽量避开UDF路线。 感谢 Aaron,这个“联合”是一个强制性查询,因为它会从 BI 工具请求,它不能是火花代码。因此,可以从这个工具中使用插入 UDF。 等待您从 BI 工具调用 udf 吗?我理解正确吗? 【参考方案1】:

这是另一种方法,它利用数据集而不是数据框来实现对对象的直接访问,而不是使用 Row。还有一种称为asElement2 的额外方法将Element 转换为Element2

case class Additional2(id: String, item_value: String, extra2: String)
case class Element2(income: String, currency: String, additional2: Additional2)

case class Additional(id: String, item_value: String)
case class Element(income:String, currency:String, additional: Additional)
  def asElement2(): Element2 =
    val additional2 = Additional2(additional.id, additional.item_value, null)
    Element2(income, currency, additional2)
  


val df = Seq(
  (Seq(Element("150000", "EUR", Additional("001", "500EUR")))),
  (Seq(Element("50000", "CHF", Additional("002", "1000CHF"))))
).toDS()

df.map
  se => se.map_.asElement2 


//or even simpler
df.map_.map_.asElement2

输出:

+-------------------------------+
|value                          |
+-------------------------------+
|[[150000, EUR, [001, 500EUR,]]]|
|[[50000, CHF, [002, 1000CHF,]]]|
+-------------------------------+

最终架构:

root
 |-- value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional2: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)
 |    |    |    |-- extra2: string (nullable = true)

【讨论】:

我喜欢你的回答。对我来说,发生了什么更清楚了。谢谢!【参考方案2】:

使用map 对DataFrame 中的嵌套行元素进行必要的转换并通过toDF 重命名列更容易:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._

case class Additional(id: String, item_value: String)
case class Element(income: String, currency: String, additional: Additional)
case class Additional2(id: String, item_value: String, extra2: String)
case class Element2(income: String, currency: String, additional: Additional2)

val df = Seq(
  (Seq(Element("70k", "US", Additional("1", "101")), Element("90k", "US", Additional("2", "202")))),
  (Seq(Element("80k", "US", Additional("3", "303"))))
).toDF("myElements")

val df2 = df.map case Row(s: Seq[Row] @unchecked) => s.map
  case Row(income: String, currency: String, additional: Row) => additional match 
    case Row(id: String, item_value: String) =>
      Element2(income, currency, Additional2(id, item_value, null))
  
.toDF("myElements")

df2.show(false)
// +--------------------------------------------+
// |myElements                                  |
// +--------------------------------------------+
// |[[70k, US, [1, 101,]], [90k, US, [2, 202,]]]|
// |[[80k, US, [3, 303,]]]                      |
// +--------------------------------------------+

df2.printSchema
// root
//  |-- myElements: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- income: string (nullable = true)
//  |    |    |-- currency: string (nullable = true)
//  |    |    |-- additional: struct (nullable = true)
//  |    |    |    |-- id: string (nullable = true)
//  |    |    |    |-- item_value: string (nullable = true)
//  |    |    |    |-- extra2: string (nullable = true)

如果出于某种原因首选 UDF,则所需的转换基本相同:

val  myUDF = udf((s: Seq[Row]) => s.map
  case Row(income: String, currency: String, additional: Row) => additional match 
    case Row(id: String, item_value: String) =>
      Element2(income, currency, Additional2(id, item_value, null))
  
)

val df2 = df.select(myUDF($"myElements").as("myElements"))

【讨论】:

以上是关于如何使用 udf 将空列添加到 Spark 中的复杂数组结构的主要内容,如果未能解决你的问题,请参考以下文章

如何避免将空值插入主键或非空列?

如何从 Spark 数据帧中的 When 子句将多个列发送到 udf?

如何使用Java UDF将新列添加到Spark数据帧

Spark 管道中的 UDF

Python - 读取文本并写入 csv。将空列替换为默认的“N/A”值

在 Postgres 中创建表时将空列设置为带时区的时间戳