向 Spark DataFrame 添加嵌套列

Posted

技术标签:

【中文标题】向 Spark DataFrame 添加嵌套列【英文标题】:Adding a nested column to Spark DataFrame 【发布时间】:2017-01-18 23:02:10 【问题描述】:

如何向任何嵌套级别的结构添加或替换字段?

这个输入:

val rdd = sc.parallelize(Seq(
  """"a": "xX": 1,"XX": 2,"b": "z": 0""",
  """"a": "xX": 3,"b": "z": 0""",
  """"a": "XX": 3,"b": "z": 0""",
  """"a": "xx": 4,"b": "z": 0"""))
var df = sqlContext.read.json(rdd)

产生以下架构:

root
 |-- a: struct (nullable = true)
 |    |-- XX: long (nullable = true)
 |    |-- xX: long (nullable = true)
 |    |-- xx: long (nullable = true)
 |-- b: struct (nullable = true)
 |    |-- z: long (nullable = true)

那么我可以这样做:

import org.apache.spark.sql.functions._
val overlappingNames = Seq(col("a.xx"), col("a.xX"), col("a.XX"))
df = df
  .withColumn("a_xx",
    coalesce(overlappingNames:_*))
  .dropNestedColumn("a.xX")
  .dropNestedColumn("a.XX")
  .dropNestedColumn("a.xx")

(dropNestedColumn 是从这个答案借来的: https://***.com/a/39943812/1068385。我基本上是在寻找它的逆运算。)

架构变成:

root
 |-- a: struct (nullable = false)
 |-- b: struct (nullable = true)
 |    |-- z: long (nullable = true)
 |-- a_xx: long (nullable = true)

显然它不会替换(或添加)a.xx,而是在根级别添加新字段a_xx

我希望能够这样做:

val overlappingNames = Seq(col("a.xx"), col("a.xX"), col("a.XX"))
df = df
  .withNestedColumn("a.xx",
    coalesce(overlappingNames:_*))
  .dropNestedColumn("a.xX")
  .dropNestedColumn("a.XX")

这样就会产生这个架构:

root
 |-- a: struct (nullable = false)
 |    |-- xx: long (nullable = true)
 |-- b: struct (nullable = true)
 |    |-- z: long (nullable = true)

我怎样才能做到这一点?

这里的实际目标是对输入 JSON 中的列名不区分大小写。最后一步很简单:收集所有重叠的列名并在每个列上应用合并。

【问题讨论】:

你得到解决方案了吗? @ShankarKoirala:不使用 Spark。在 Hive 中,使用 COALESCE 来实现我想要的东西是微不足道的。 【参考方案1】:

它可能没有它应有的优雅或高效,但这是我想出的:

object DataFrameUtils 
  private def nullableCol(parentCol: Column, c: Column): Column = 
    when(parentCol.isNotNull, c)
  

  private def nullableCol(c: Column): Column = 
    nullableCol(c, c)
  

  private def createNestedStructs(splitted: Seq[String], newCol: Column): Column = 
    splitted
      .foldRight(newCol) 
        case (colName, nestedStruct) => nullableCol(struct(nestedStruct as colName))
      
  

  private def recursiveAddNestedColumn(splitted: Seq[String], col: Column, colType: DataType, nullable: Boolean, newCol: Column): Column = 
    colType match 
      case colType: StructType if splitted.nonEmpty => 
        var modifiedFields: Seq[(String, Column)] = colType.fields
          .map(f => 
            var curCol = col.getField(f.name)
            if (f.name == splitted.head) 
              curCol = recursiveAddNestedColumn(splitted.tail, curCol, f.dataType, f.nullable, newCol)
            
            (f.name, curCol as f.name)
          )

        if (!modifiedFields.exists(_._1 == splitted.head)) 
          modifiedFields :+= (splitted.head, nullableCol(col, createNestedStructs(splitted.tail, newCol)) as splitted.head)
        

        var modifiedStruct: Column = struct(modifiedFields.map(_._2): _*)
        if (nullable) 
          modifiedStruct = nullableCol(col, modifiedStruct)
        
        modifiedStruct
      
      case _  => createNestedStructs(splitted, newCol)
    
  

  private def addNestedColumn(df: DataFrame, newColName: String, newCol: Column): DataFrame = 
    if (newColName.contains('.')) 
      var splitted = newColName.split('.')

      val modifiedOrAdded: (String, Column) = df.schema.fields
        .find(_.name == splitted.head)
        .map(f => (f.name, recursiveAddNestedColumn(splitted.tail, col(f.name), f.dataType, f.nullable, newCol)))
        .getOrElse 
          (splitted.head, createNestedStructs(splitted.tail, newCol) as splitted.head)
        

      df.withColumn(modifiedOrAdded._1, modifiedOrAdded._2)

     else 
      // Top level addition, use spark method as-is
      df.withColumn(newColName, newCol)
    
  

  implicit class ExtendedDataFrame(df: DataFrame) extends Serializable 
    /**
      * Add nested field to DataFrame
      *
      * @param newColName Dot-separated nested field name
      * @param newCol New column value
      */
    def withNestedColumn(newColName: String, newCol: Column): DataFrame = 
      DataFrameUtils.addNestedColumn(df, newColName, newCol)
    
  

请随时改进。

val data = spark.sparkContext.parallelize(List(""" "a1": 1, "a3":  "b1": 3, "b2":  "c1": 5, "c2": 6   """))
val df: DataFrame = spark.read.json(data)

val df2 = df.withNestedColumn("a3.b2.c3.d1", $"a3.b2")

应该产生:

assertResult("struct<a1:bigint,a3:struct<b1:bigint,b2:struct<c1:bigint,c2:bigint,c3:struct<d1:struct<c1:bigint,c2:bigint>>>>>")(df2.shema.simpleString)

【讨论】:

@Michel Lemay 它适用于问题中的情况。谢谢。我正在尝试将它应用于嵌套的结构数组,但它失败了,这对于我的实际火花知识来说有点太远了......你能帮我吗? 确实,这不是我们需要的功能,所以我将其留待将来改进。要使用当前代码支持这一点,必须修改 case _ 并支持嵌套结构数组。嵌套的简单类型也需要提升为结构。此外,我们需要在 newCol 中支持数组并处理目标数组中可能不同数量的元素。

以上是关于向 Spark DataFrame 添加嵌套列的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark SQL 中向现有 Dataframe 添加新列

Scala Spark DataFrame SQL withColumn - 如何使用函数(x:字符串)进行转换

使用 PySpark 删除 Dataframe 的嵌套列

将索引列添加到现有 Spark 的 DataFrame

缓存嵌套列时,Spark 是不是优化存储

如何在 Spark DataFrame 中添加常量列?