在 Scala 中用不同的 DataType 展平一个 DataFrame

Posted

技术标签:

【中文标题】在 Scala 中用不同的 DataType 展平一个 DataFrame【英文标题】:Flatten a DataFrame in Scala with different DataTypes inside 【发布时间】:2018-01-01 19:36:48 【问题描述】:

您可能知道,DataFrame 可以包含复杂类型的字段,例如结构 (StructType) 或数组 (ArrayType)。在我的情况下,您可能需要将所有 DataFrame 数据映射到 Hive 表,并使用简单的类型字段(字符串、整数......)。 我为这个问题苦苦挣扎了很长时间,终于找到了我想分享的解决方案。 另外,我相信它可以改进,所以请随时回复您自己的建议。

它基于this thread,但也适用于 ArrayType 元素,而不仅仅是 StructType 元素。 它是一个尾递归函数,它接收一个 DataFrame,并将其展平返回。

def flattenDf(df: DataFrame): DataFrame = 
  var end = false
  var i = 0
  val fields = df.schema.fields
  val fieldNames = fields.map(f => f.name)
  val fieldsNumber = fields.length

  while (!end) 
    val field = fields(i)
    val fieldName = field.name

    field.dataType match 
      case st: StructType =>
        val childFieldNames = st.fieldNames.map(n => fieldName + "." + n)
        val newFieldNames = fieldNames.filter(_ != fieldName) ++ childFieldNames
        val newDf = df.selectExpr(newFieldNames: _*)
        return flattenDf(newDf)
      case at: ArrayType =>
        val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName)
        val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode($fieldName) as a")
        val fieldNamesToSelect = fieldNamesExcludingArray ++ Array("a.*")
        val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
        val explodedAndSelectedDf = explodedDf.selectExpr(fieldNamesToSelect: _*)
        return flattenDf(explodedAndSelectedDf)
      case _ => Unit
    

    i += 1
    end = i >= fieldsNumber
  
  df

【问题讨论】:

对于初学者,val fieldNames = df.schema.fieldNames:D @RameshMaharjan 试试看.. 工作得很好。 【参考方案1】:

val df = Seq(("1", (2, (3, 4)),Seq(1,2))).toDF()

df.printSchema

root
 |-- _1: string (nullable = true)
 |-- _2: struct (nullable = true)
 |    |-- _1: integer (nullable = false)
 |    |-- _2: struct (nullable = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: integer (nullable = false)
 |-- _3: array (nullable = true)
 |    |-- element: integer (containsNull = false)


def flattenSchema(schema: StructType, fieldName: String = null) : Array[Column] = 
   schema.fields.flatMap(f => 
     val cols = if (fieldName == null) f.name else (fieldName + "." + f.name)
     f.dataType match 
       case structType: StructType => fattenSchema(structType, cols)
       case arrayType: ArrayType => Array(explode(col(cols)))
       case _ => Array(col(cols))
     
   )
 

df.select(flattenSchema(df.schema) :_*).printSchema

root
 |-- _1: string (nullable = true)
 |-- _1: integer (nullable = true)
 |-- _1: integer (nullable = true)
 |-- _2: integer (nullable = true)
 |-- col: integer (nullable = false)

【讨论】:

以上是关于在 Scala 中用不同的 DataType 展平一个 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

Scala嵌套数组展平

Scala中的嵌套与展平模式匹配

Scala 展平深度函数混淆

如何在 Scala 中展平期货列表

在pyspark中展平嵌套的json scala代码

所有 Spark SQL DataType 的 Scala 类型映射是啥