Spark 是不是支持使用 Parquet 文件进行分区修剪
Posted
技术标签:
【中文标题】Spark 是不是支持使用 Parquet 文件进行分区修剪【英文标题】:Does Spark support Partition Pruning with Parquet FilesSpark 是否支持使用 Parquet 文件进行分区修剪 【发布时间】:2016-09-07 21:13:00 【问题描述】:我正在处理一个大型数据集,该数据集由两列分区 - plant_name
和 tag_id
。第二个分区 - tag_id
有 200000 个唯一值,我主要通过特定的 tag_id
值访问数据。如果我使用以下 Spark 命令:
sqlContext.setConf("spark.sql.hive.metastorePartitionPruning", "true")
sqlContext.setConf("spark.sql.parquet.filterPushdown", "true")
val df = sqlContext.sql("select * from tag_data where plant_name='PLANT01' and tag_id='1000'")
我希望得到快速响应,因为这会解析为单个分区。在 Hive 和 Presto 中,这需要几秒钟,但在 Spark 中,它会运行几个小时。
实际数据保存在 S3 存储桶中,当我提交 sql 查询时,Spark 关闭并首先从 Hive 元存储中获取所有分区(其中 200000 个),然后调用refresh()
强制执行完整S3 对象存储中所有这些文件的状态列表(实际上是调用listLeafFilesInParallel
)。
正是这两个操作如此昂贵,是否有任何设置可以让 Spark 在调用元数据存储期间或之后立即修剪分区?
【问题讨论】:
我也试过上面的代码,加上一个额外的配置参数:sqlContext.setConf("spark.sql.hive.verifyPartitionPath", "false")
对性能没有影响
这是一个有趣的问题,但很难回答,因为您没有描述 tag_data
的 DataFrame 是如何创建的。我认为扩展这个问题是一个好主意,这样它就可以自己重现。
如果我对 Hive 和 Parquet 有更多了解,我可能会。事实上,我不知道如何创建一个(双重)分区的 Parquet 文件。我不清楚您是直接使用 Parquet 文件,还是以某种方式涉及 Hive。 (Hive 被提到过好几次了,但如果这只是一个 Parquet 文件,我不知道它的作用是什么。)
添加您的 spark 版本。我不确定,但可能会创建外部表(搜索它)会有所帮助(为此启用配置单元支持)。据我了解,它只会进行一次扫描,然后将这些数据保存在配置单元元数据存储中。下次你不会花这个开销。再次验证以上所有内容。
相关Spark lists all leaf node even in partitioned data
【参考方案1】:
是的,spark 支持分区修剪。
Spark 会列出分区目录(顺序或并行listLeafFilesInParallel
),以便在第一时间构建所有分区的缓存。扫描数据的同一应用程序中的查询会利用此缓存。所以你看到的缓慢可能是因为这个缓存构建。扫描数据的后续查询使用缓存来修剪分区。
这些是显示列出的分区以填充缓存的日志。
App > 16/11/14 10:45:24 main INFO ParquetRelation: Listing s3://test-bucket/test_parquet_pruning/month=2015-01 on driver
App > 16/11/14 10:45:24 main INFO ParquetRelation: Listing s3://test-bucket/test_parquet_pruning/month=2015-02 on driver
App > 16/11/14 10:45:24 main INFO ParquetRelation: Listing s3://test-bucket/test_parquet_pruning/month=2015-03 on driver
这些是显示正在修剪的日志。
App > 16/11/10 12:29:16 main INFO DataSourceStrategy: Selected 1 partitions out of 20, pruned 95.0% partitions.
参考HiveMetastoreCatalog.scala
中的convertToParquetRelation
和getHiveQlPartitions
。
【讨论】:
【参考方案2】:只是一个想法:
HadoopFsRelation 的 Spark API 文档说, (https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/sources/HadoopFsRelation.html)
"...从存储在文件中的 Hive 样式分区表读取时 系统,它能够从路径中发现分区信息 输入目录,并在开始前执行分区修剪 正在读取数据...”
所以,我想“listLeafFilesInParallel”可能不是问题。
类似的问题已经在 spark jira 中:https://issues.apache.org/jira/browse/SPARK-10673
尽管“spark.sql.hive.verifyPartitionPath”设置为 false 并且对性能没有影响,但我怀疑 问题可能是由未注册的分区引起的。请列出表的分区并验证是否全部 分区已注册。否则,请按照以下链接恢复您的分区:
Hive doesn't read partitioned parquet files generated by Spark
更新:
我猜想在写入数据时设置了适当的 parquet 块大小和页面大小。
使用提到的分区创建一个新的 hive 表,文件格式为 parquet,使用动态分区方法从非分区表加载它。 (https://cwiki.apache.org/confluence/display/Hive/DynamicPartitions) 运行一个普通的 hive 查询,然后通过运行 spark 程序进行比较。
免责声明:我不是火花/镶木地板专家。这个问题听起来很有趣,因此得到了回应。
【讨论】:
【参考方案3】:最近出现了类似的问题: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-reads-all-leaf-directories-on-a-partitioned-Hive-table-td35997.html#a36007
这个问题很老,但我想我也会在这里发布解决方案。
spark.sql.hive.convertMetastoreParquet=false
将使用 Hive parquet serde,而不是 spark 内置 parquet serde。 Hive 的 Parquet serde 不会在所有分区上执行 listLeafFiles,而只会直接从选定的分区中读取。在具有许多分区和文件的表上,这要快得多(也更便宜)。随意尝试吧! :)
【讨论】:
以上是关于Spark 是不是支持使用 Parquet 文件进行分区修剪的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL - 如何将 DataFrame 写入文本文件?
SPARK Parquet嵌套类型的向量化支持以及列索引(column index)
SPARK Parquet嵌套类型的向量化支持以及列索引(column index)
使用Spark读写Parquet文件验证Parquet自带表头的性质及NULL值来源Java