在 Scala 中用不同的 DataType 展平一个 DataFrame
Posted
技术标签:
【中文标题】在 Scala 中用不同的 DataType 展平一个 DataFrame【英文标题】:Flatten a DataFrame in Scala with different DataTypes inside 【发布时间】:2018-01-01 19:36:48 【问题描述】:您可能知道,DataFrame 可以包含复杂类型的字段,例如结构 (StructType) 或数组 (ArrayType)。在我的情况下,您可能需要将所有 DataFrame 数据映射到 Hive 表,并使用简单的类型字段(字符串、整数......)。 我为这个问题苦苦挣扎了很长时间,终于找到了我想分享的解决方案。 另外,我相信它可以改进,所以请随时回复您自己的建议。
它基于this thread,但也适用于 ArrayType 元素,而不仅仅是 StructType 元素。 它是一个尾递归函数,它接收一个 DataFrame,并将其展平返回。
def flattenDf(df: DataFrame): DataFrame =
var end = false
var i = 0
val fields = df.schema.fields
val fieldNames = fields.map(f => f.name)
val fieldsNumber = fields.length
while (!end)
val field = fields(i)
val fieldName = field.name
field.dataType match
case st: StructType =>
val childFieldNames = st.fieldNames.map(n => fieldName + "." + n)
val newFieldNames = fieldNames.filter(_ != fieldName) ++ childFieldNames
val newDf = df.selectExpr(newFieldNames: _*)
return flattenDf(newDf)
case at: ArrayType =>
val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName)
val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode($fieldName) as a")
val fieldNamesToSelect = fieldNamesExcludingArray ++ Array("a.*")
val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
val explodedAndSelectedDf = explodedDf.selectExpr(fieldNamesToSelect: _*)
return flattenDf(explodedAndSelectedDf)
case _ => Unit
i += 1
end = i >= fieldsNumber
df
【问题讨论】:
对于初学者,val fieldNames = df.schema.fieldNames
:D
@RameshMaharjan 试试看.. 工作得很好。
【参考方案1】:
val df = Seq(("1", (2, (3, 4)),Seq(1,2))).toDF()
df.printSchema
root
|-- _1: string (nullable = true)
|-- _2: struct (nullable = true)
| |-- _1: integer (nullable = false)
| |-- _2: struct (nullable = true)
| | |-- _1: integer (nullable = false)
| | |-- _2: integer (nullable = false)
|-- _3: array (nullable = true)
| |-- element: integer (containsNull = false)
def flattenSchema(schema: StructType, fieldName: String = null) : Array[Column] =
schema.fields.flatMap(f =>
val cols = if (fieldName == null) f.name else (fieldName + "." + f.name)
f.dataType match
case structType: StructType => fattenSchema(structType, cols)
case arrayType: ArrayType => Array(explode(col(cols)))
case _ => Array(col(cols))
)
df.select(flattenSchema(df.schema) :_*).printSchema
root
|-- _1: string (nullable = true)
|-- _1: integer (nullable = true)
|-- _1: integer (nullable = true)
|-- _2: integer (nullable = true)
|-- col: integer (nullable = false)
【讨论】:
以上是关于在 Scala 中用不同的 DataType 展平一个 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章