如何让 Spark 使用 Parquet 文件中的分区信息?
Posted
技术标签:
【中文标题】如何让 Spark 使用 Parquet 文件中的分区信息?【英文标题】:How to make Spark use partition information from Parquet files? 【发布时间】:2016-02-11 23:07:34 【问题描述】:我正在尝试为一些 SparkSql 查询预先计算分区。如果我计算并保留分区,Spark 会使用它们。如果我将分区数据保存到 Parquet 并稍后重新加载,分区信息将消失,Spark 将重新计算它。
实际数据足够大,需要花费大量时间进行分区。下面的代码充分说明了这些问题。 Test2() 是目前我唯一可以开始工作的事情,但我想快速启动实际处理,这就是 test3() 正在尝试做的事情。
有人知道我做错了什么吗? ..或者如果这是 Spark 可以做的事情?
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
# NOTE: Need to have python in PATH, SPARK_HOME set to location of spark, HADOOP_HOME set to location of winutils
if __name__ == "__main__":
sc = SparkContext(appName="PythonPartitionBug")
sql_text = "select foo, bar from customer c, orders o where foo < 300 and c.custkey=o.custkey"
def setup():
sqlContext = SQLContext(sc)
fields1 = [StructField(name, IntegerType()) for name in ['custkey', 'foo']]
data1 = [(1, 110), (2, 210), (3, 310), (4, 410), (5, 510)]
df1 = sqlContext.createDataFrame(data1, StructType(fields1))
df1.persist()
fields2 = [StructField(name, IntegerType()) for name in ['orderkey', 'custkey', 'bar']]
data2 = [(1, 1, 10), (2, 1, 20), (3, 2, 30), (4, 3, 40), (5, 4, 50)]
df2 = sqlContext.createDataFrame(data2, StructType(fields2))
df2.persist()
return sqlContext, df1, df2
def test1():
# Without repartition the final plan includes hashpartitioning
# == Physical Plan ==
# Project [foo#1,bar#14]
# +- SortMergeJoin [custkey#0], [custkey#13]
# :- Sort [custkey#0 ASC], false, 0
# : +- TungstenExchange hashpartitioning(custkey#0,200), None
# : +- Filter (foo#1 < 300)
# : +- InMemoryColumnarTableScan [custkey#0,foo#1], [(foo#1 < 300)], InMemoryRelation [custkey#0,foo#1], true, 10000, StorageLevel(false, true, false, false, 1), ConvertToUnsafe, None
# +- Sort [custkey#13 ASC], false, 0
# +- TungstenExchange hashpartitioning(custkey#13,200), None
# +- InMemoryColumnarTableScan [bar#14,custkey#13], InMemoryRelation [orderkey#12,custkey#13,bar#14], true, 10000, StorageLevel(false, true, false, false, 1), ConvertToUnsafe, None
sqlContext, df1, df2 = setup()
df1.registerTempTable("customer")
df2.registerTempTable("orders")
df3 = sqlContext.sql(sql_text)
df3.collect()
df3.explain(True)
def test2():
# With repartition the final plan does not include hashpartitioning
# == Physical Plan ==
# Project [foo#56,bar#69]
# +- SortMergeJoin [custkey#55], [custkey#68]
# :- Sort [custkey#55 ASC], false, 0
# : +- Filter (foo#56 < 300)
# : +- InMemoryColumnarTableScan [custkey#55,foo#56], [(foo#56 < 300)], InMemoryRelation [custkey#55,foo#56], true, 10000, StorageLevel(false, true, false, false, 1), TungstenExchange hashpartitioning(custkey#55,4), None, None
# +- Sort [custkey#68 ASC], false, 0
# +- InMemoryColumnarTableScan [bar#69,custkey#68], InMemoryRelation [orderkey#67,custkey#68,bar#69], true, 10000, StorageLevel(false, true, false, false, 1), TungstenExchange hashpartitioning(custkey#68,4), None, None
sqlContext, df1, df2 = setup()
df1a = df1.repartition(4, 'custkey').persist()
df1a.registerTempTable("customer")
df2a = df2.repartition(4, 'custkey').persist()
df2a.registerTempTable("orders")
df3 = sqlContext.sql(sql_text)
df3.collect()
df3.explain(True)
def test3():
# After round tripping the partitioned data, the partitioning is lost and spark repartitions
# == Physical Plan ==
# Project [foo#223,bar#284]
# +- SortMergeJoin [custkey#222], [custkey#283]
# :- Sort [custkey#222 ASC], false, 0
# : +- TungstenExchange hashpartitioning(custkey#222,200), None
# : +- Filter (foo#223 < 300)
# : +- InMemoryColumnarTableScan [custkey#222,foo#223], [(foo#223 < 300)], InMemoryRelation [custkey#222,foo#223], true, 10000, StorageLevel(false, true, false, false, 1), Scan ParquetRelation[custkey#222,foo#223] InputPaths: file:/E:/.../df1.parquet, None
# +- Sort [custkey#283 ASC], false, 0
# +- TungstenExchange hashpartitioning(custkey#283,200), None
# +- InMemoryColumnarTableScan [bar#284,custkey#283], InMemoryRelation [orderkey#282,custkey#283,bar#284], true, 10000, StorageLevel(false, true, false, false, 1), Scan ParquetRelation[orderkey#282,custkey#283,bar#284] InputPaths: file:/E:/.../df2.parquet, None
sqlContext, df1, df2 = setup()
df1a = df1.repartition(4, 'custkey').persist()
df1a.write.parquet("df1.parquet", mode='overwrite')
df1a = sqlContext.read.parquet("df1.parquet")
df1a.persist()
df1a.registerTempTable("customer")
df2a = df2.repartition(4, 'custkey').persist()
df2a.write.parquet("df2.parquet", mode='overwrite')
df2a = sqlContext.read.parquet("df2.parquet")
df2a.persist()
df2a.registerTempTable("orders")
df3 = sqlContext.sql(sql_text)
df3.collect()
df3.explain(True)
test1()
test2()
test3()
sc.stop()
【问题讨论】:
【参考方案1】:您没有做错任何事情 - 但您无法使用 Spark 实现您想要实现的目标:用于保存文件的分区器在写入磁盘时必然丢失。为什么?因为 Spark 没有自己的文件格式,它依赖于现有的格式(例如 Parquet、ORC 或文本文件),而这些格式甚至都不知道 Partitioner(这是 Spark 内部的),所以他们无法保留该信息。数据已在磁盘上正确分区,但 Spark 无法知道从磁盘加载时使用了哪个分区器,因此它别无选择,只能重新分区。
test2() 没有显示这一点的原因是您重用了相同的 DataFrame 实例,这些实例确实存储了分区信息(在内存中)。
【讨论】:
【参考方案2】:更好的解决方案是使用persist(StorageLevel.MEMORY_AND_DISK_ONLY)
,如果它们被从内存中逐出,它会将 RDD/DF 分区溢出到 Worker 的本地磁盘。这种情况下,重建分区只需要从Worker的本地磁盘拉取数据,速度比较快。
【讨论】:
以上是关于如何让 Spark 使用 Parquet 文件中的分区信息?的主要内容,如果未能解决你的问题,请参考以下文章
使用 spark 写入 parquet 文件时如何添加额外的元数据
如何对 Spark Streaming 生成的分区 parquet 文件进行适当的内务管理
如何使用许多小文件加速 Spark 的 parquet 阅读器