Spark parquet 模式演变

Posted

技术标签:

【中文标题】Spark parquet 模式演变【英文标题】:Spark parquet schema evolution 【发布时间】:2020-03-17 17:21:06 【问题描述】:

我有一个分区的 hdfs parquet 位置,它具有不同的架构是不同的分区。

说第一个分区有 5 列,第二个分区有 4 个列。现在我尝试读取基本 Parquet 路径,然后过滤第二个分区。

这在 DF 中为我提供了 5 列,即使我在第二分区的 Parquet 文件中只有 4 列。 当我直接读取第二个分区时,它给出了正确的 4 列。如何解决这个问题。

【问题讨论】:

【参考方案1】:

您可以在读取 parquet 文件时指定所需的schema(4 columns)

那么spark只读取schema中包含的字段,如果数据中不存在该字段则返回null

Example:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val sch=new StructType().add("i",IntegerType).add("z",StringType)
spark.read.schema(sch).parquet("<parquet_file_path>").show()

//here i have i in my data and not have z field
//+---+----+
//|  i|   z|
//+---+----+
//|  1|null|
//+---+----+

【讨论】:

我正在应用我自己的架构,但它在第二个分区中只有 4 列可用。我正在阅读具有多个分区的基本目录。我的 DF 模式有 5 个列作为具有 5 个模式的分区 1 文件。即使只过滤了分区 2,我的 DF 中仍然有 5 个列 阅读时是否添加了 .schema(&lt;new_schema&gt;) 然后阅读 parquet 目录?只有当我们指定 .schema 选项时,parquet 才会提取指定模式的数据,否则它将读取所有字段! 不,我不会在阅读自身时添加架构。它推断架构,然后我将其转换为 rdd,进行一些处理,最后创建一个带有架构的数据框。在这个模式中,我只给出 4 个列,但 rdd 已经有 5 个列。因此,如果我有 100 列,数据将从第 5 列转移【参考方案2】:

我真的很想帮助你,但我不确定你真正想要实现什么。您对此有何打算?

如果您要读取包含所有分区的 parquet 文件,并且只想获取两个分区都有的列,那么读取选项“mergeSchema”可能适合您的需要。

与 Protocol Buffer、Avro 和 Thrift 一样,Parquet 也支持模式演化。用户可以从一个简单的模式开始,然后根据需要逐渐向模式中添加更多列。这样,用户最终可能会得到多个 Parquet 文件,这些文件具有不同但相互兼容的模式。 Parquet 数据源现在能够自动检测这种情况并合并所有这些文件的模式。

由于模式合并是一项相对昂贵的操作,而不是 在大多数情况下,我们默认将其关闭,从 1.5.0。您可以通过在读取 Parquet 文件时将数据源选项 mergeSchema 设置为 true 来启用它(如下例所示),或者 将全局 SQL 选项 spark.sql.parquet.mergeSchema 设置为 true。

参考spark documentation

所以您使用的是哪个版本的 spark 以及如何设置属性 spark.sql.parquet.mergeSchema(spark 设置)和 mergeSchema(客户端)会很有趣

【讨论】:

是的。我只想要所有分区中通用的模式。分区 1 有 5 个列,分区 2 有 4 个列,所以当我读取此文件时,我应该在 DF 中只得到 4 个列,因为分区 1 中的第 5 列在分区 2 中不可用,但我在我的 DF 中得到 5 个列,即使在之后将 DF 过滤为只有分区 2。但是分区 2 中的实际 Parquet 文件没有第 5 列,并且 spark 会为此列插入我不想要的 null。当我过滤分区 2 时,我的 DF 中只需要 4 个列 好的,既然我得到了你,我认为解决这个问题的最简单和最可靠的方法就是 @Shu 刚才所说的。但它不是通用的,我喜欢通用的、可重用的解决方案。在读取分区 Parquet 文件时,您是否已经看过 mergeSchema 选项? 我不能给出 .schema 并读取 bcz 我将在读取后转换为 rdd 并操作数据,最后,我将再次使用此处的模式转换为 DF。到目前为止,我已经更改了读取直接分区目录,在这种情况下,我没有在 DF 中获取分区列,因此我手动添加它,但是当我读取一个分区时,它提供了正确的架构,该架构在该分区内可用与以前的方法不同,它提供了额外的模式

以上是关于Spark parquet 模式演变的主要内容,如果未能解决你的问题,请参考以下文章

显示在 Spark+Parquet 程序中读取的字节数

为啥 Spark 不能自动检测 Parquet 文件中的新字段?

Spark基础学习笔记25:Spark SQL数据源 - Parquet文件

pyspark.sql.utils.AnalysisException:u'无法推断Parquet的模式。必须手动指定。

为啥 spark.read.parquet() 运行 2 个作业?

Parquet + Spark SQL