Spark parquet 模式演变
Posted
技术标签:
【中文标题】Spark parquet 模式演变【英文标题】:Spark parquet schema evolution 【发布时间】:2020-03-17 17:21:06 【问题描述】:我有一个分区的 hdfs parquet 位置,它具有不同的架构是不同的分区。
说第一个分区有 5 列,第二个分区有 4 个列。现在我尝试读取基本 Parquet 路径,然后过滤第二个分区。
这在 DF 中为我提供了 5 列,即使我在第二分区的 Parquet 文件中只有 4 列。 当我直接读取第二个分区时,它给出了正确的 4 列。如何解决这个问题。
【问题讨论】:
【参考方案1】:您可以在读取 parquet 文件时指定所需的schema(4 columns)
!
schema
中包含的字段,如果数据中不存在该字段则返回null
。
Example:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val sch=new StructType().add("i",IntegerType).add("z",StringType)
spark.read.schema(sch).parquet("<parquet_file_path>").show()
//here i have i in my data and not have z field
//+---+----+
//| i| z|
//+---+----+
//| 1|null|
//+---+----+
【讨论】:
我正在应用我自己的架构,但它在第二个分区中只有 4 列可用。我正在阅读具有多个分区的基本目录。我的 DF 模式有 5 个列作为具有 5 个模式的分区 1 文件。即使只过滤了分区 2,我的 DF 中仍然有 5 个列 阅读时是否添加了.schema(<new_schema>)
然后阅读 parquet 目录?只有当我们指定 .schema 选项时,parquet 才会提取指定模式的数据,否则它将读取所有字段!
不,我不会在阅读自身时添加架构。它推断架构,然后我将其转换为 rdd,进行一些处理,最后创建一个带有架构的数据框。在这个模式中,我只给出 4 个列,但 rdd 已经有 5 个列。因此,如果我有 100 列,数据将从第 5 列转移【参考方案2】:
我真的很想帮助你,但我不确定你真正想要实现什么。您对此有何打算?
如果您要读取包含所有分区的 parquet 文件,并且只想获取两个分区都有的列,那么读取选项“mergeSchema”可能适合您的需要。
与 Protocol Buffer、Avro 和 Thrift 一样,Parquet 也支持模式演化。用户可以从一个简单的模式开始,然后根据需要逐渐向模式中添加更多列。这样,用户最终可能会得到多个 Parquet 文件,这些文件具有不同但相互兼容的模式。 Parquet 数据源现在能够自动检测这种情况并合并所有这些文件的模式。
由于模式合并是一项相对昂贵的操作,而不是 在大多数情况下,我们默认将其关闭,从 1.5.0。您可以通过在读取 Parquet 文件时将数据源选项 mergeSchema 设置为 true 来启用它(如下例所示),或者 将全局 SQL 选项 spark.sql.parquet.mergeSchema 设置为 true。
参考spark documentation
所以您使用的是哪个版本的 spark 以及如何设置属性 spark.sql.parquet.mergeSchema
(spark 设置)和 mergeSchema
(客户端)会很有趣
【讨论】:
是的。我只想要所有分区中通用的模式。分区 1 有 5 个列,分区 2 有 4 个列,所以当我读取此文件时,我应该在 DF 中只得到 4 个列,因为分区 1 中的第 5 列在分区 2 中不可用,但我在我的 DF 中得到 5 个列,即使在之后将 DF 过滤为只有分区 2。但是分区 2 中的实际 Parquet 文件没有第 5 列,并且 spark 会为此列插入我不想要的 null。当我过滤分区 2 时,我的 DF 中只需要 4 个列 好的,既然我得到了你,我认为解决这个问题的最简单和最可靠的方法就是 @Shu 刚才所说的。但它不是通用的,我喜欢通用的、可重用的解决方案。在读取分区 Parquet 文件时,您是否已经看过mergeSchema
选项?
我不能给出 .schema 并读取 bcz 我将在读取后转换为 rdd 并操作数据,最后,我将再次使用此处的模式转换为 DF。到目前为止,我已经更改了读取直接分区目录,在这种情况下,我没有在 DF 中获取分区列,因此我手动添加它,但是当我读取一个分区时,它提供了正确的架构,该架构在该分区内可用与以前的方法不同,它提供了额外的模式以上是关于Spark parquet 模式演变的主要内容,如果未能解决你的问题,请参考以下文章
为啥 Spark 不能自动检测 Parquet 文件中的新字段?
Spark基础学习笔记25:Spark SQL数据源 - Parquet文件
pyspark.sql.utils.AnalysisException:u'无法推断Parquet的模式。必须手动指定。