一文讲透大数据列存标准格式:Parquet

Posted 网易有数

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一文讲透大数据列存标准格式:Parquet相关的知识,希望对你有一定的参考价值。


导读:

今天介绍一种大数据时代有名的列式存储文件格式:Parquet,被广泛用于 Spark、Hadoop 数据存储。Parquet 中文直译是镶木地板,意思是结构紧凑,空间占用率高。

1

概念

大规模分析型数据处理在互联网乃至其他行业中应用都已越来越广泛,尤其是当前已经可以用廉价的存储来收集、保存海量的业务数据情况下。如何让分析师和工程师便捷的利用这些数据也变得越来越重要。列式存储(Column-oriented Storage)是大数据场景面向分析型数据的主流存储方式。与行式存储相比,列存由于可以只提取部分数据列、同列同质数据拥有更好的编码及压缩方式,因此在 OLAP 场景下能提供更好的 IO 性能。

Apache Parquet 是由 Twitter 和 Cloudera 最先发起并合作开发的列存项目,也是 2010 年 Google 发表的 Dremel 论文中描述的内部列存格式的开源实现。和一些传统的列式存储(C-Store、MonetDB 等)系统相比,Dremel/Parquet 最大的贡献是支持嵌套格式数据(Nested Data)的列式存储。嵌套格式可以很自然的描述互联网和科学计算等领域的数据,Dremel/Parquet “原生”的支持嵌套格式数据减少了规则化、重新组合这些大规模数据的代价。

Parquet 的设计与计算框架、数据模型以及编程语言无关,可以与任意项目集成,因此应用广泛。目前已经是 Hadoop 大数据生态圈列式存储的事实标准。

2

原理

2.1 行存 VS 列存

例如,下图是拥有 A/B/C 3 个字段的简单示意表:

在面向行的存储中,每列的数据依次排成一行,如下所示:

而在面向列的存储中,相同列的数据存储在一起:

显而易见,行存适用于数据整行读取场景,而列存适用于只读取部分列数据(如统计分析等)场景。

2.2 数据模型

(1)schema 协议

想要深入的了解 Parquet 存储格式首先需要理解它的数据模型。Parquet 采用了一个类似 Google Protobuf 的协议来描述存储数据的 schema。下面是 Parquet 数据 schema 的一个简单示例:

  • message AddressBook   required string owner;  repeated string ownerPhoneNumbers;  repeated group contacts     required string name;    optional string phoneNumber;  

    schema 的最上层是 message,里面可以包含一系列字段。每个字段都拥有3个属性:重复性(repetition)、类型(type)以及名称(name)。字段类型可以是一个 group 或者原子类型(如 int/boolean/string 等),group 可以用来表示数据的嵌套结构。字段的重复性有三种情况:

  • required:有且只有一次

  • optional:0或1次

  • repeated:0或多次

  • 这个模型非常的简洁。一些复杂的数据类型如:Map/List/Set 也可以用重复的字段(repeated fields) + groups 来表达,因此也就不用再单独定义这些类型。

    采用 repeated field 表达 List 或者 Set 的示例:

    采用 repeated group(包含 key 和 value,其中 key 是 required) 来表达 Map 的示例:

    (2)列式存储格式

    试想一下,为了使数据能够按列存储,对于一条记录(Record),首先要将其按列(Column)进行拆分。对于扁平(Flat)结构数据,拆分比较直观,一个字段即对应一列,而嵌套格式数据会复杂些。Dremel/Parquet 中,提出以树状层级的形式组织 schema 中的字段(Field),树的叶子结点对应一个原子类型字段,这样这个模型能同时覆盖扁平结构和嵌套结构数据(扁平结构只是嵌套结构的一种特例)。嵌套字段的完整路径使用简单的点分符号表示,如 contacts. name。

    AddressBook 例子以树状结构展示的样式:

    列存连续的存储一个字段的值,以便进行高效的编码压缩及快速的读取。Dremel 中行存 vs 列存的图示:

    (3)Repetition and Definition Levels

    对于嵌套格式列存,除了按列拆分进行连续的存储,还需要能够“无损”的保留嵌套格式的结构化信息,以便正确的重建记录。

    只有字段值不能表达清楚记录的结构。给定一个重复字段的两个值,我们不知道此值是在什么“级别”被重复的(比如,这些值是来自两个不同的记录,还是相同的记录中两个重复的值)。同样的,给出一个缺失的可选字段,我们不知道整个路径有多少字段被显示定义了。

    Dremel 提出了 Repetition Level(重复级别)和 Definition Level(定义级别)两个概念,用以解决这个问题。并实现了记录中任意一个字段的恢复都不需要依赖其它字段,且可以对任意字段子集按原始嵌套格式进行重建。

    Repetition levels:用以表示在该字段路径上哪个节点进行了重复(at what repeated field in the field’s path the value has repeated)。

    一个重复字段存储的列值,有可能来自不同记录,也可能由同一记录的不同层级节点重复导致。如上图中的 Code 字段,他在 r1 记录中出现了 3 次,分别是字段 Name 和 Language 重复导致的,其中 Language 先重复了 2 次,Name 字段再重复了 1 次。

    Repetition Levels 采用数字代表重复节点的层级。根据树形层次结构,根结点为 0、下一层级为 1… 依次类推。根结点的重复暗含了记录的重复,也即 r=0 代表新记录的开始。required 和 optional 字段不需要 repetition level,只有可重复的字段需要。因此,上述 Code 字段的 repetition levels 范围为 0-2。当我们从上往下扫描 r1 记录,首先遇到 Code 的值是“en-us”,由于它之前没有该字段路径相关的字段出现,因此 r=0;其次遇到“en”,是 Language 层级重复导致的,r=2;最后遇到“en-gb”,是 Name 层级重复导致的,因此 r=1。所以,Code 字段的 repetition levels 在 r1 记录中是“0,2,1”。

    需要注意的是,r1 记录中的第二个重复 Name,由于其不包含 Code 字段,为了区分“en-gb”值是来自记录中的第三个 Name 而不是第二个,我们需要在“en”和“en-gb”之间插入一个值“null”。由于它是 Name 级重复的,因此它的 r=1。另外还需要注意一些隐含信息,比如 Code 是 required 字段类型,因此一旦 Code 出现未定义,则隐含表明其上级 Language 也肯定未定义。

    Definition Levels:用以表示该字段路径上有多少可选的字段实际进行了定义(how many fields in p that could be undefined (because they are optional or repeated) are actually present)。

    光有 Repetition Levels 尚无法完全保留嵌套结构信息,考虑上述图中 r1 记录的 Backward 字段。由于 r1 中未定义 Backward 字段,因此我们插入一个“null”并设置 r=0。但 Backward 的上级 Links 字段在 r1 中显式的进行了定义,null 和 r=0 无法再表达出这一层信息。因此需要额外再添加 Definition Levels 定义记录可选字段出现的个数,Backward 的路径上出现 1 个可选字段 Links,因此它的 d=1。

    有了 Definition Levels 我们就可以清楚的知道该值出现在字段路径的第几层,对未定义字段的 null 和字段实际的值为 null 也能进行区分。只有 optional 和 repeated 字段需要 Definition Levels 定义,因为 required 字段已经隐含了字段肯定被定义(这可以减少 Definition Levels 需要描述的数字,并在一定程度上节省后续的存储空间)。另外一些其他的隐含信息:如果 Definition Levels 小于路径中 optional + repeated 字段的数量,则该字段的值肯定为 null;Definition Levels 的值为 0 隐含了 Repeated Levels 也为 0(路径中没有 optional/repeated 字段或整个路径未定义)。

    (4)striping and assembly 算法

    现在把 Repetition Levels 和 Definition Levels 两个概念一起考虑。还是沿用上述 AddressBook 例子。下表显示了 AddressBook 中每个字段的最大重复和定义级别,并解释了为什么它们小于列的深度:

    假设这是两条真实的 AddressBook 数据:

  • AddressBook   owner: "Julien Le Dem",  ownerPhoneNumbers: "555 123 4567",  ownerPhoneNumbers: "555 666 1337",  contacts:     name: "Dmitriy Ryaboy",    phoneNumber: "555 987 6543",  ,  contacts:     name: "Chris Aniszczyk"  AddressBook   owner: "A. Nonymous"

    我们采用 contacts.phoneNumber 字段来演示一下拆解和重组记录的 striping and assembly 算法。

    仅针对 contacts.phoneNumber 字段投影后,数据具有如下结构:

  • AddressBook   contacts:     phoneNumber: "555 987 6543"    contacts:   AddressBook 

    计算可得该该字段对应的数据如下(R =重复级别,D =定义级别):

    因此我们最终存储的记录数据如下:

  • contacts.phoneNumber: “555 987 6543”new record: R = 0value is defined: D = maximum (2)contacts.phoneNumber: nullrepeated contacts: R = 1only defined up to contacts: D = 1contacts: nullnew record: R = 0only defined up to AddressBook: D = 0

    使用图表展示(注意其中的 null 值并不会实际存储,原因如上所说只要 Definition Levels 小于其 max 值即隐含该字段值为 null):

    在重组该记录时,我们重复读取该字段的值:

  • R=0, D=2, Value = “555 987 6543”:R = 0 means a new record. We recreate the nested records from the root until the definition level (here 2)D = 2 which is the maximum. The value is defined and is inserted.R=1, D=1:R = 1 means a new entry in the contacts list at level 1.D = 1 means contacts is defined but not phoneNumber, so we just create an empty contacts.R=0, D=0:R = 0 means a new record. we create the nested records from the root until the definition levelD = 0 => contacts is actually null, so we only have an empty AddressBook
    3

    实现

    Parquet 工程具体的实现。

    3.1 Parquet 文件存储格式中的术语

  • Block (hdfs block):即指 HDFS Block,Parquet 的设计与 HDFS 完全兼容。Block 是 HDFS 文件存储的基本单位,HDFS 会维护一个 Block 的多个副本。在 Hadoop 1.x 版本中 Block 默认大小 64M,Hadoop 2.x 版本中默认大小为 128M。

  • File:HDFS 文件,保存了该文件的元数据信息,但可以不包含实际数据(由 Block 保存)。

  • Row group:按照行将数据划分为多个逻辑水平分区。一个 Row group(行组)由每个列的一个列块(Column Chunk)组成。

  • Column chunk:一个列的列块,分布在行组当中,并在文件中保证是连续的。

  • Page:一个列块切分成多个 Pages(页面),概念上讲,页面是 Parquet 中最小的基础单元(就压缩和编码方面而言)。一个列块中可以有多个类型的页面。

  • 3.2 并行化执行的基本单元

  • MapReduce - File/Row Group(一个任务对应一个文件或一个行组)

  • IO - Column chunk(任务中的 IO 以列块为单位进行读取)

  • Encoding/Compression - Page(编码格式和压缩一次以一个页面为单位进行)

  • 3.3 Parquet 文件格式

    Parquet 文件格式是自解析的,采用 thrift 格式定义的文件 schema 以及其他元数据信息一起存储在文件的末尾。

    文件存储格式示例:

  • 4-byte magic number "PAR1"<Column 1 Chunk 1 + Column Metadata><Column 2 Chunk 1 + Column Metadata>...<Column N Chunk 1 + Column Metadata><Column 1 Chunk 2 + Column Metadata><Column 2 Chunk 2 + Column Metadata>...<Column N Chunk 2 + Column Metadata>...<Column 1 Chunk M + Column Metadata><Column 2 Chunk M + Column Metadata>...<Column N Chunk M + Column Metadata>File Metadata4-byte length in bytes of file metadata4-byte magic number "PAR1"

    整个文件(表)有 N 个列,划分成了 M 个行组,每个行组都有所有列的一个 Chunk 和其元数据信息。文件的元数据信息存储在数据之后,包含了所有列块元数据信息的起始位置。读取的时候首先从文件末尾读取文件元数据信息,再在其中找到感兴趣的 Column Chunk 信息,并依次读取。文件元数据信息放在文件最后是为了方便数据依序一次性写入。

    具体的存储格式展示图:

    3.4 元数据信息

    Parquet 总共有 3 种类型的元数据:文件元数据、列(块)元数据和 page header 元数据。所有元数据都采用 thrift 协议存储。具体信息如下所示:

    3.5 Parquet 数据类型

    在实现层级上,Parquet 只保留了最精简的部分数据类型,以方便存储和读写。在其上有逻辑类型(Logical Types)以供扩展,比如:逻辑类型 strings 就映射为带有 UTF8 标识的二进制 byte arrays 进行存储。

    Types:

  • BOOLEAN: 1 bit booleanINT32: 32 bit signed intsINT64: 64 bit signed intsINT96: 96 bit signed intsFLOAT: IEEE 32-bit floating point valuesDOUBLE: IEEE 64-bit floating point valuesBYTE_ARRAY: arbitrarily long byte arrays.

    逻辑类型的更多说明请参考:

    https://github.com/apache/parquet-format/blob/master/LogicalTypes.md

    3.6 Encoding

    数据编码的实现大部分和原理部分所阐述的一致,这里不再重复说明,更多细节可参考:https://github.com/apache/parquet-format/blob/master/Encodings.md

    3.7 Column chunks 存储

    Column chunks 由一个个 Pages 组成,Reader 在读取的时候可以根据 page header 信息跳过不感兴趣的页面。page header 中还存储着页面数据编码和压缩的信息。

    3.8 错误情况处理

    如果文件元数据损坏,则整个文件将丢失。如果列元数据损坏,则该列块将丢失(但其他行组中该列的列块还可以使用)。如果 page header 损坏,则该列块中的剩余页面都将丢失。如果页面中的数据损坏,则该页面将丢失。较小的文件行组配置,可以更有效地抵抗损坏。

    3.9 推荐配置

    行组大小(Row group size):更大的行组允许更大的列块,这使得可以执行更大的顺序 IO。不过更大的行组需要更大的写缓存。Parquet 建议使用较大的行组(512MB-1GB)。此外由于可能需要读取整个行组,因此最好一个行组能完全适配一个 HDFS Block。因此,HDFS 块大小也需要相应的设置更大。一个较优的读取配置为:行组大小 1GB,HDFS 块大小 1GB,每个 HDFS 文件对应 1 个 HDFS 块。

    数据页大小(Data page size):数据页应视为不可分割的,因此较小的数据页可实现更细粒度的读取(例如单行查找)。但较大的页面可以减少空间的开销(减少 page header 数量)和潜在的较少的解析开销(处理 headers)。Parquet 建议的页面大小为 8KB。

    参考资料:

    [1]. Dremel: Interactive Analysis of WebScale Datasets

    [2]. Dremel made simple with Parquet

    [3]. 经典论文翻译导读之《Dremel: Interactive Analysis of WebScale Datasets》

    [4]. 处理海量数据:列式存储综述(存储篇)

    [5]. https://parquet.apache.org/documentation/latest/


    作者简介

    大鲸,网易数帆有数实时数仓开发工程师,曾从事搜索系统、实时计算平台等相关工作。



    获取最新动态

    最新的推文无法在第一时间看到?

    以前的推文还需要复杂漫长的翻阅?

    进入“网易有数”公众号介绍页,点击右上角

    “设为星标”

    置顶公众号,从此消息不迷路


    设为星标,最新推文不迷路


    分享,点赞,在看,安排一下?

    SPARK Parquet嵌套类型的向量化支持以及列索引(column index)

    背景

    本文基于Spark 3.3.0
    列式存储Parquet文件越来越受到工业界的青睐,在delta以及Spark中应用广泛,具体的项目见:parquet-mr

    分析

    Parquet格式

    关于parquet的格式存储以及读取,可以参考大数据列存标准格式 - Parquet,总结一下就是:

    Parquet采用类似Protobuf的协议来描述数据的Schema,字段的描述有三种(逻辑上):

    required  有且仅有一次
    optional 0或1次
    repeated 0次或多次
    

    具体到物理存储,就得有Repetition Level(对应repeated),Definition Level(对应optional) ,required是不需要的,因为字段存在就有,不存在就没有。
    其实这种很好理解,因为在Dremel/Parquet中,提出的是以树状形式来组织schema中的字段,举例子:

    message AddressBook 
      required string owner;
      repeated string phoneNumber;
      repeated group contacts 
        required string name;
        optional string phoneNumber;
      
    
    

    对应到树形结构为:

                  AddressBook
              /           |               \\  
             V            V                V
           owner      phoneNumber    contacts
                                        /    \\
                                       V      V
                                     name    phoneNumber 
                                    
    

    这样用Repetition Level 和Definition Level来表示数据位于哪一层级就能精确定位一个数据。

    Parquet嵌套类型向量化

    根据以上的Parquet的格式存储,在读取的Parquet文件的时候,对于非向量化的读取,是一行一行的读取,支持所有类型,对于向量化的读取在Spark 3.3.0以前是不支持嵌套类型(如 struct map list)的。
    具体行读取如下(具体到page级别):

    向量化的读取如下:

    可以看到是按照批次读取的。
    注意:按照schema的定义,有些数据有可能是不存在的,所以对于同一列对应的Repetition Level 和Definition Level也有可能是不一样的。
    性能也是有很大的提升:

    Parquet的列索引

    在Parquet 1.11.0之前,Parquet是不支持列索引的,具体见PARQUET-1201
    在Spark 3.2.0 便支持了paruqet的列索引的读取,具体见:SPARK-26345
    在spark 3.2.0 之前Parquet的谓词下推是基于Row group的统计信息来的,如:最大最小值,字典信息,以及Parquet-1.12的Bloom filter,
    在Spark 3.2.0 之后,我们可以基于page级别的数据过滤(只选择需要的page),这样能大大减少IO,因为在page级别过滤的话,不需要每次都会获取整个Row group的数据。

    之前的读取是把对应Row group的数据全部读取过来,之后再进行过滤。

    以上是关于一文讲透大数据列存标准格式:Parquet的主要内容,如果未能解决你的问题,请参考以下文章

    一文讲透蓝牙WiFiZigBee无线通信模块

    一文讲透蓝牙WiFiZigBee无线通信模块

    一文讲透蓝牙WiFiZigBee无线通信模块

    SPARK Parquet嵌套类型的向量化支持以及列索引(column index)

    SPARK Parquet嵌套类型的向量化支持以及列索引(column index)

    一文剖析 PolarDB HTAP 的列存数据压缩