从 UDF 内的 Spark SQL 行中提取嵌套数组

Posted

技术标签:

【中文标题】从 UDF 内的 Spark SQL 行中提取嵌套数组【英文标题】:Extract a nested array from a Spark SQL Row inside a UDF 【发布时间】:2018-07-13 11:46:02 【问题描述】:

我正在使用 DataFrame,需要提取数据。 我有很多嵌套的关卡,所以我在第一关使用了爆炸和选择,然后我将 UDF 用于嵌套关卡。

我有一个采用 $"Root.Obj" 的 UDF,它是一个数组,我希望它返回一个 Array[MyObj]。 我的输出类:

case class MyObj(fieldA: Boolean, fieldB: String, fieldC: Array[MyNested])
case class MyNested(field1: Long, field2: String)

简而言之,这是输入模式:

 |-- Obj: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- FieldA: boolean (nullable = true)
 |    |    |-- FieldB: string (nullable = true)
 |    |    |-- FieldC: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- Field1: long (nullable = true)
 |    |    |    |    |-- Field2: string (nullable = true)
 |    |    |-- FieldD: boolean (nullable = true)

我的 UDF:

def extractObjs: UserDefinedFunction = udf 
  (objs: Seq[Row]) ⇒
    objs.map 
      obj ⇒
        MyObj(
          obj.getAs[Boolean]("FieldA"),
          obj.getAs[String]("FieldB"),
          extractNested(obj.get???("FieldC"))
        )
    


def extractNested(nesteds: ???): Array[MyNested] = 
  ???

这是更复杂的 IRL,因为我需要从其他地方检索值并且有更多嵌套数组。 此外,Obj 和 FieldC 的输入结构比这里复杂得多,我不能(或不想)为它们创建案例类。由于我需要在多个地方执行此操作,假设我不知道 FieldC 元素的“结构”。

我的问题是提取“FieldC”数组。我想要一个 Seq[Row] 但我无法获得它,getStruct 只给我一个 Row,然后 getSeq[Row] 抛出错误,因为scala.collection.mutable.WrappedArray$ofRef cannot be cast to org.apache.spark.sql.Row

【问题讨论】:

你应该将 fieldC 提取为Seq[Row] 【参考方案1】:

结构体映射到UDF中的Row,所以结构体数组可以被Seq[Row]访问,例如:

def extractObjs: UserDefinedFunction = udf 
  (objs: Seq[Row]) ⇒
    objs.map 
      obj ⇒
        MyObj(
          obj.getAs[Boolean]("FieldA"),
          obj.getAs[String]("FieldB"),
          extractNested(obj.getAs[Seq[Row]]("FieldC"))
        )
    


def extractNested(nesteds: Seq[Row]): Array[MyNested] = 
  nesteds.map(r => MyNested(r.getAs[Long]("Field1"),r.getAs[String]("Field2"))).toArray

【讨论】:

当我这样做时,正如我在帖子末尾所说的那样,我遇到了演员表错误。 我不知道我在测试中哪里弄错了,但它现在确实在工作。谢谢

以上是关于从 UDF 内的 Spark SQL 行中提取嵌套数组的主要内容,如果未能解决你的问题,请参考以下文章

尝试从 UDF 执行 spark sql 查询

Spark SQL:如何使用 JAVA 从 DataFrame 操作中调用 UDF

Scala 和 Spark UDF 函数

如何从spark中的嵌套结构类型中提取列名和数据类型

使用 Spark 从 DynamoDB JSON 字符串中提取嵌套的 Json 字段?

使用 Scala 从 Spark 的 withColumn 中调用 udf 时出错