如何更改 DataFrame 的架构(修复一些嵌套字段的名称)?
Posted
技术标签:
【中文标题】如何更改 DataFrame 的架构(修复一些嵌套字段的名称)?【英文标题】:How to change the schema of a DataFrame (to fix the names of some nested fields)? 【发布时间】:2017-07-20 14:57:17 【问题描述】:我有一个问题,当我们将 Json 文件加载到 Spark 中时,将其存储为 Parquet,然后尝试从 Impala 访问 Parquet 文件; Impala 抱怨列的名称,因为它们包含在 SQL 中非法的字符。
JSON 文件的“特点”之一是它们没有预定义的架构。我希望 Spark 创建架构,然后我必须修改包含非法字符的字段名称。
我的第一个想法是在 DataFrame 中的字段名称上使用withColumnRenamed
,但这仅适用于我认为的***字段,因此我无法使用它,因为 Json 包含嵌套数据。
所以我创建了以下代码来重新创建 DataFrames 架构,递归地遍历该结构。然后我使用这个新模式重新创建 DataFrame。
(根据 Jacek 对使用 Scala 复制构造函数的建议进行了更新。)
def replaceIllegal(s: String): String = s.replace("-", "_").replace("&", "_").replace("\"", "_").replace("[", "_").replace("[", "_")
def removeIllegalCharsInColumnNames(schema: StructType): StructType =
StructType(schema.fields.map field =>
field.dataType match
case struct: StructType =>
field.copy(name = replaceIllegal(field.name), dataType = removeIllegalCharsInColumnNames(struct))
case _ =>
field.copy(name = replaceIllegal(field.name))
)
sparkSession.createDataFrame(df.rdd, removeIllegalCharsInColumnNames(df.schema))
这行得通。但是有没有更好/更简单的方法来实现我想做的事情?
有没有更好的方法来替换 DataFrame 上的现有架构?以下代码不起作用:
df.select($"*".cast(removeIllegalCharsInColumnNames(df.schema)))
它给出了这个错误:
org.apache.spark.sql.AnalysisException: Invalid usage of '*' in expression 'cast'
【问题讨论】:
【参考方案1】:我认为最好的办法是将数据集(在保存为 parquet 文件之前)转换为 RDD,并使用您的自定义架构来描述您想要的结构。
val targetSchema: StructType = ...
val fromJson: DataFrame = ...
val targetDataset = spark.createDataFrame(fromJson.rdd, targetSchema)
请参阅SparkSession.createDataFrame 中的示例作为参考,但是当您要从数据集创建它时,它直接使用 RDD。
val schema =
StructType(
StructField("name", StringType, false) ::
StructField("age", IntegerType, true) :: Nil)
val people =
sc.textFile("examples/src/main/resources/people.txt").map(
_.split(",")).map(p => Row(p(0), p(1).trim.toInt))
val dataFrame = sparkSession.createDataFrame(people, schema)
dataFrame.printSchema
// root
// |-- name: string (nullable = false)
// |-- age: integer (nullable = true)
但正如你在评论中提到的(我后来合并到你的问题中):
JSON 文件没有预定义的架构。
话虽如此,我认为您的解决方案是正确的。 Spark 没有提供任何开箱即用的类似功能,我认为它更多的是关于开发自定义 Scala 代码,该代码将遍历 StructType
/StructField
树并更改不正确的内容。
我建议在您的代码中更改的是使用 copy
构造函数(Scala 案例类的一个特性 - 请参阅 A Scala case class ‘copy’ method example),它只会更改不正确的名称,而其他属性不受影响。
使用copy
构造函数将(大致)对应于以下代码:
// was
// case s: StructType =>
// StructField(replaceIllegal(field.name), removeIllegalCharsInColumnNames(s), field.nullable, field.metadata)
s.copy(name = replaceIllegal(field.name), dataType = removeIllegalCharsInColumnNames(s))
函数式语言(通常)和 Scala(特别是)中有一些设计模式可以处理深层嵌套结构操作,但这可能太多了(我不愿分享它)。
因此,我认为问题在于其当前的“形态”,更多的是关于如何将树作为数据结构而不是 Spark 模式来操作。
【讨论】:
谢谢@Jacek - 还有没有更好的方法来应用新模式?我认为代码 "df.select($"*".cast(removeIllegalCharsInColumnNames(df.schema)))" 应该可以工作,但使用 * 是不允许的。有没有办法选择DataFrame的根? 您可以使用df.schema
创建一个新架构(不是select("*"...
),这正是您想要的,但可以采用不同的方式。以上是关于如何更改 DataFrame 的架构(修复一些嵌套字段的名称)?的主要内容,如果未能解决你的问题,请参考以下文章
如何修复 PHP 嵌套循环我在尝试创建嵌套循环时遇到了一些问题?
从深度嵌套的 JSON 创建 Pandas DataFrame