在 Spark 数据框中爆炸嵌套结构

Posted

技术标签:

【中文标题】在 Spark 数据框中爆炸嵌套结构【英文标题】:Exploding nested Struct in Spark dataframe 【发布时间】:2017-01-09 14:11:09 【问题描述】:

我正在研究一个 Databricks 示例。数据框的架构如下所示:

> parquetDF.printSchema
root
|-- department: struct (nullable = true)
|    |-- id: string (nullable = true)
|    |-- name: string (nullable = true)
|-- employees: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- firstName: string (nullable = true)
|    |    |-- lastName: string (nullable = true)
|    |    |-- email: string (nullable = true)
|    |    |-- salary: integer (nullable = true)

在示例中,他们展示了如何将员工列分解为 4 个附加列:

val explodeDF = parquetDF.explode($"employees")  
case Row(employee: Seq[Row]) => employee.map employee =>
  val firstName = employee(0).asInstanceOf[String]
  val lastName = employee(1).asInstanceOf[String]
  val email = employee(2).asInstanceOf[String]
  val salary = employee(3).asInstanceOf[Int]
  Employee(firstName, lastName, email, salary)
 
.cache()
display(explodeDF)

我将如何对部门列做类似的事情(即向数据框中添加两个名为“id”和“name”的附加列)?方法不完全相同,我只能弄清楚如何使用以下方法创建一个全新的数据框:

val explodeDF = parquetDF.select("department.id","department.name")
display(explodeDF)

如果我尝试:

val explodeDF = parquetDF.explode($"department")  
  case Row(dept: Seq[String]) => dept.mapdept => 
  val id = dept(0) 
  val name = dept(1)
   
.cache()
display(explodeDF)

我收到警告和错误:

<console>:38: warning: non-variable type argument String in type pattern Seq[String] is unchecked since it is eliminated by erasure
            case Row(dept: Seq[String]) => dept.mapdept => 
                           ^
<console>:37: error: inferred type arguments [Unit] do not conform to    method explode's type parameter bounds [A <: Product]
  val explodeDF = parquetDF.explode($"department")  
                                   ^

【问题讨论】:

【参考方案1】:

在我看来,最优雅的解决方案是使用 select 运算符对 Struct 进行星号扩展,如下所示:

var explodedDf2 = explodedDf.select("department.*","*")

https://docs.databricks.com/spark/latest/spark-sql/complex-types.html

【讨论】:

【参考方案2】:

你可以使用类似的东西:

var explodeDF = explodeDF.withColumn("id", explodeDF("department.id"))
explodeDeptDF = explodeDeptDF.withColumn("name", explodeDeptDF("department.name"))

你帮助我解决了这些问题:

Flattening Rows in Spark Spark 1.4.1 DataFrame explode list of JSON objects

【讨论】:

A 阶段失败:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 41.0 中的任务 0 失败 4 次,最近一次失败:阶段 41.0 中的任务 0.3 丢失(TID 1403, 10.81.214.49): scala.MatchError: [[789012,Mechanical Engineering]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) @Feynman27 this 有帮助吗?它似乎与 您的 尝试相匹配。我认为我的答案的问题是employees 也有一个元素,而department 没有。 是的,employees 示例创建新行,而department 示例应该只创建两个新列。 相关问题:***.com/questions/30008127/… 我们可以一次重命名所有嵌套列吗?例如,department.id -> inner_id, department.name -> inner_name, ...【参考方案3】:

这似乎可行(尽管可能不是最优雅的解决方案)。

var explodeDF2 = explodeDF.withColumn("id", explodeDF("department.id"))
explodeDF2 = explodeDF2.withColumn("name", explodeDF2("department.name"))

【讨论】:

你可以val explodeDF2 = explodeDF.withColumn("id", explodeDF("department.id")).withColumn("name", explodeDF2("department.name"))

以上是关于在 Spark 数据框中爆炸嵌套结构的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Scala 在 Spark 中爆炸嵌套结构

如何在 Spark 数据框中使用嵌套列进行连接

在 spark 数据框中的嵌套 json 中将部分父 Schema 列添加到子项

使用 spark-xml 从 pyspark 数据框中选择嵌套列

Spark Scala,如何检查数据框中是不是存在嵌套列

如何在spark scala数据框中更新嵌套列的xml值