如何更改 DataFrame 的架构(修复一些嵌套字段的名称)?

Posted

技术标签:

【中文标题】如何更改 DataFrame 的架构(修复一些嵌套字段的名称)?【英文标题】:How to change the schema of a DataFrame (to fix the names of some nested fields)? 【发布时间】:2017-07-20 14:57:17 【问题描述】:

我有一个问题,当我们将 Json 文件加载到 Spark 中时,将其存储为 Parquet,然后尝试从 Impala 访问 Parquet 文件; Impala 抱怨列的名称,因为它们包含在 SQL 中非法的字符。

JSON 文件的“特点”之一是它们没有预定义的架构。我希望 Spark 创建架构,然后我必须修改包含非法字符的字段名称。

我的第一个想法是在 DataFrame 中的字段名称上使用withColumnRenamed,但这仅适用于我认为的***字段,因此我无法使用它,因为 Json 包含嵌套数据。

所以我创建了以下代码来重新创建 DataFrames 架构,递归地遍历该结构。然后我使用这个新模式重新创建 DataFrame。

(根据 Jacek 对使用 Scala 复制构造函数的建议进行了更新。)

def replaceIllegal(s: String): String = s.replace("-", "_").replace("&", "_").replace("\"", "_").replace("[", "_").replace("[", "_")
def removeIllegalCharsInColumnNames(schema: StructType): StructType = 
  StructType(schema.fields.map  field =>
    field.dataType match 
      case struct: StructType =>
        field.copy(name = replaceIllegal(field.name), dataType = removeIllegalCharsInColumnNames(struct))
      case _ =>
        field.copy(name = replaceIllegal(field.name))
    
  )


sparkSession.createDataFrame(df.rdd, removeIllegalCharsInColumnNames(df.schema))

这行得通。但是有没有更好/更简单的方法来实现我想做的事情?

有没有更好的方法来替换 DataFrame 上的现有架构?以下代码不起作用:

df.select($"*".cast(removeIllegalCharsInColumnNames(df.schema)))

它给出了这个错误:

org.apache.spark.sql.AnalysisException: Invalid usage of '*' in expression 'cast'

【问题讨论】:

【参考方案1】:

我认为最好的办法是将数据集(在保存为 parquet 文件之前)转换为 RDD,并使用您的自定义架构来描述您想要的结构。

val targetSchema: StructType = ...
val fromJson: DataFrame = ...
val targetDataset = spark.createDataFrame(fromJson.rdd, targetSchema)

请参阅SparkSession.createDataFrame 中的示例作为参考,但是当您要从数据集创建它时,它直接使用 RDD。

val schema =
  StructType(
    StructField("name", StringType, false) ::
    StructField("age", IntegerType, true) :: Nil)

val people =
  sc.textFile("examples/src/main/resources/people.txt").map(
    _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
val dataFrame = sparkSession.createDataFrame(people, schema)
dataFrame.printSchema
// root
// |-- name: string (nullable = false)
// |-- age: integer (nullable = true)

但正如你在评论中提到的(我后来合并到你的问题中):

JSON 文件没有预定义的架构。

话虽如此,我认为您的解决方案是正确的。 Spark 没有提供任何开箱即用的类似功能,我认为它更多的是关于开发自定义 Scala 代码,该代码将遍历 StructType/StructField 树并更改不正确的内容。

我建议在您的代码中更改的是使用 copy 构造函数(Scala 案例类的一个特性 - 请参阅 A Scala case class ‘copy’ method example),它只会更改不正确的名称,而其他属性不受影响。

使用copy 构造函数将(大致)对应于以下代码:

// was
// case s: StructType =>
//    StructField(replaceIllegal(field.name), removeIllegalCharsInColumnNames(s), field.nullable, field.metadata)
s.copy(name = replaceIllegal(field.name), dataType = removeIllegalCharsInColumnNames(s))

函数式语言(通常)和 Scala(特别是)中有一些设计模式可以处理深层嵌套结构操作,但这可能太多了(我不愿分享它)。

因此,我认为问题在于其当前的“形态”,更多的是关于如何将树作为数据结构而不是 Spark 模式来操作。

【讨论】:

谢谢@Jacek - 还有没有更好的方法来应用新模式?我认为代码 "df.select($"*".cast(removeIllegalCharsInColumnNames(df.schema)))" 应该可以工作,但使用 * 是不允许的。有没有办法选择DataFrame的根? 您可以使用df.schema 创建一个新架构(不是select("*"...),这正是您想要的,但可以采用不同的方式。

以上是关于如何更改 DataFrame 的架构(修复一些嵌套字段的名称)?的主要内容,如果未能解决你的问题,请参考以下文章

spark 如何从 JSON 推断数字类型?

如何修复 PHP 嵌套循环我在尝试创建嵌套循环时遇到了一些问题?

从嵌套字典创建 Spark DataFrame

从深度嵌套的 JSON 创建 Pandas DataFrame

如何在python,dataframe中将数据转换为嵌套字典

如何将嵌套字典转换为 pandas DataFrame?