如何让 Spark 使用 Parquet 文件中的分区信息?

Posted

技术标签:

【中文标题】如何让 Spark 使用 Parquet 文件中的分区信息?【英文标题】:How to make Spark use partition information from Parquet files? 【发布时间】:2016-02-11 23:07:34 【问题描述】:

我正在尝试为一些 SparkSql 查询预先计算分区。如果我计算并保留分区,Spark 会使用它们。如果我将分区数据保存到 Parquet 并稍后重新加载,分区信息将消失,Spark 将重新计算它。

实际数据足够大,需要花费大量时间进行分区。下面的代码充分说明了这些问题。 Test2() 是目前我唯一可以开始工作的事情,但我想快速启动实际处理,这就是 test3() 正在尝试做的事情。

有人知道我做错了什么吗? ..或者如果这是 Spark 可以做的事情?

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# NOTE: Need to have python in PATH, SPARK_HOME set to location of spark, HADOOP_HOME set to location of winutils
if __name__ == "__main__":
    sc = SparkContext(appName="PythonPartitionBug")
    sql_text = "select foo, bar from customer c, orders o where foo < 300 and c.custkey=o.custkey"

    def setup():
        sqlContext = SQLContext(sc)
        fields1 = [StructField(name, IntegerType()) for name in ['custkey', 'foo']]
        data1 = [(1, 110), (2, 210), (3, 310), (4, 410), (5, 510)]
        df1 = sqlContext.createDataFrame(data1, StructType(fields1))
        df1.persist()
        fields2 = [StructField(name, IntegerType()) for name in ['orderkey', 'custkey', 'bar']]
        data2 = [(1, 1, 10), (2, 1, 20), (3, 2, 30), (4, 3, 40), (5, 4, 50)]
        df2 = sqlContext.createDataFrame(data2, StructType(fields2))
        df2.persist()
        return sqlContext, df1, df2

    def test1():
        # Without repartition the final plan includes hashpartitioning
        # == Physical Plan ==
        # Project [foo#1,bar#14]
        # +- SortMergeJoin [custkey#0], [custkey#13]
        #    :- Sort [custkey#0 ASC], false, 0
        #    :  +- TungstenExchange hashpartitioning(custkey#0,200), None
        #    :     +- Filter (foo#1 < 300)
        #    :        +- InMemoryColumnarTableScan [custkey#0,foo#1], [(foo#1 < 300)], InMemoryRelation [custkey#0,foo#1], true, 10000, StorageLevel(false, true, false, false, 1), ConvertToUnsafe, None
        #    +- Sort [custkey#13 ASC], false, 0
        #       +- TungstenExchange hashpartitioning(custkey#13,200), None
        #          +- InMemoryColumnarTableScan [bar#14,custkey#13], InMemoryRelation [orderkey#12,custkey#13,bar#14], true, 10000, StorageLevel(false, true, false, false, 1), ConvertToUnsafe, None
        sqlContext, df1, df2 = setup()
        df1.registerTempTable("customer")
        df2.registerTempTable("orders")
        df3 = sqlContext.sql(sql_text)
        df3.collect()
        df3.explain(True)

    def test2():
        # With repartition the final plan does not include hashpartitioning
        # == Physical Plan ==
        # Project [foo#56,bar#69]
        # +- SortMergeJoin [custkey#55], [custkey#68]
        #    :- Sort [custkey#55 ASC], false, 0
        #    :  +- Filter (foo#56 < 300)
        #    :     +- InMemoryColumnarTableScan [custkey#55,foo#56], [(foo#56 < 300)], InMemoryRelation [custkey#55,foo#56], true, 10000, StorageLevel(false, true, false, false, 1), TungstenExchange hashpartitioning(custkey#55,4), None, None
        #    +- Sort [custkey#68 ASC], false, 0
        #       +- InMemoryColumnarTableScan [bar#69,custkey#68], InMemoryRelation [orderkey#67,custkey#68,bar#69], true, 10000, StorageLevel(false, true, false, false, 1), TungstenExchange hashpartitioning(custkey#68,4), None, None
        sqlContext, df1, df2 = setup()
        df1a = df1.repartition(4, 'custkey').persist()
        df1a.registerTempTable("customer")

        df2a = df2.repartition(4, 'custkey').persist()
        df2a.registerTempTable("orders")

        df3 = sqlContext.sql(sql_text)
        df3.collect()
        df3.explain(True)

    def test3():
        # After round tripping the partitioned data, the partitioning is lost and spark repartitions
        # == Physical Plan ==
        # Project [foo#223,bar#284]
        # +- SortMergeJoin [custkey#222], [custkey#283]
        #    :- Sort [custkey#222 ASC], false, 0
        #    :  +- TungstenExchange hashpartitioning(custkey#222,200), None
        #    :     +- Filter (foo#223 < 300)
        #    :        +- InMemoryColumnarTableScan [custkey#222,foo#223], [(foo#223 < 300)], InMemoryRelation [custkey#222,foo#223], true, 10000, StorageLevel(false, true, false, false, 1), Scan ParquetRelation[custkey#222,foo#223] InputPaths: file:/E:/.../df1.parquet, None
        #    +- Sort [custkey#283 ASC], false, 0
        #       +- TungstenExchange hashpartitioning(custkey#283,200), None
        #          +- InMemoryColumnarTableScan [bar#284,custkey#283], InMemoryRelation [orderkey#282,custkey#283,bar#284], true, 10000, StorageLevel(false, true, false, false, 1), Scan ParquetRelation[orderkey#282,custkey#283,bar#284] InputPaths: file:/E:/.../df2.parquet, None
        sqlContext, df1, df2 = setup()
        df1a = df1.repartition(4, 'custkey').persist()
        df1a.write.parquet("df1.parquet", mode='overwrite')
        df1a = sqlContext.read.parquet("df1.parquet")
        df1a.persist()
        df1a.registerTempTable("customer")

        df2a = df2.repartition(4, 'custkey').persist()
        df2a.write.parquet("df2.parquet", mode='overwrite')
        df2a = sqlContext.read.parquet("df2.parquet")
        df2a.persist()
        df2a.registerTempTable("orders")

        df3 = sqlContext.sql(sql_text)
        df3.collect()
        df3.explain(True)

    test1()
    test2()
    test3()
    sc.stop()

【问题讨论】:

【参考方案1】:

您没有做错任何事情 - 但您无法使用 Spark 实现您想要实现的目标:用于保存文件的分区器在写入磁盘时必然丢失。为什么?因为 Spark 没有自己的文件格式,它依赖于现有的格式(例如 Parquet、ORC 或文本文件),而这些格式甚至都不知道 Partitioner(这是 Spark 内部的),所以他们无法保留该信息。数据已在磁盘上正确分区,但 Spark 无法知道从磁盘加载时使用了哪个分区器,因此它别无选择,只能重新分区。

test2() 没有显示这一点的原因是您重用了相同的 DataFrame 实例,这些实例确实存储了分区信息(在内存中)。

【讨论】:

【参考方案2】:

更好的解决方案是使用persist(StorageLevel.MEMORY_AND_DISK_ONLY),如果它们被从内存中逐出,它会将 RDD/DF 分区溢出到 Worker 的本地磁盘。这种情况下,重建分区只需要从Worker的本地磁盘拉取数据,速度比较快。

【讨论】:

以上是关于如何让 Spark 使用 Parquet 文件中的分区信息?的主要内容,如果未能解决你的问题,请参考以下文章

使用 spark 写入 parquet 文件时如何添加额外的元数据

如何对 Spark Streaming 生成的分区 parquet 文件进行适当的内务管理

如何使用许多小文件加速 Spark 的 parquet 阅读器

Spark SQL 中的 Parquet 文件

Spark - 如何将 Bz2 文件解压缩为 parquet 文件

使用 spark sql 重命名 Parquet 文件中列名中的空格