为啥 Apache Spark 会读取嵌套结构中不必要的 Parquet 列?

Posted

技术标签:

【中文标题】为啥 Apache Spark 会读取嵌套结构中不必要的 Parquet 列?【英文标题】:Why does Apache Spark read unnecessary Parquet columns within nested structures?为什么 Apache Spark 会读取嵌套结构中不必要的 Parquet 列? 【发布时间】:2017-03-04 06:15:07 【问题描述】:

我的团队正在构建一个 ETL 流程,以使用 Spark 将原始分隔文本文件加载到基于 Parquet 的“数据湖”中。 Parquet 列存储的承诺之一是查询只会读取必要的“列条纹”。

但我们看到为嵌套架构结构读取了意外的列。

为了演示,这里是一个使用 Scala 和 Spark 2.0.1 shell 的 POC:

// Preliminary setup
sc.setLogLevel("INFO")
import org.apache.spark.sql.types._
import org.apache.spark.sql._

// Create a schema with nested complex structures
val schema = StructType(Seq(
    StructField("F1", IntegerType),
    StructField("F2", IntegerType),
    StructField("Orig", StructType(Seq(
        StructField("F1", StringType),
        StructField("F2", StringType))))))

// Create some sample data
val data = spark.createDataFrame(
    sc.parallelize(Seq(
        Row(1, 2, Row("1", "2")),
        Row(3, null, Row("3", "ABC")))),
    schema)

// Save it
data.write.mode(SaveMode.Overwrite).parquet("data.parquet")

然后我们将文件读回 DataFrame 并投影到列的子集:

// Read it back into another DataFrame
val df = spark.read.parquet("data.parquet")

// Select & show a subset of the columns
df.select($"F1", $"Orig.F1").show

当它运行时,我们会看到预期的输出:

+---+-------+
| F1|Orig_F1|
+---+-------+
|  1|      1|
|  3|      3|
+---+-------+

但是...查询计划显示的情况略有不同:

“优化方案”显示:

val projected = df.select($"F1", $"Orig.F1".as("Orig_F1"))
projected.queryExecution.optimizedPlan
// Project [F1#18, Orig#20.F1 AS Orig_F1#116]
// +- Relation[F1#18,F2#19,Orig#20] parquet

并且“解释”显示:

projected.explain
// == Physical Plan ==
// *Project [F1#18, Orig#20.F1 AS Orig_F1#116]
// +- *Scan parquet [F1#18,Orig#20] Format: ParquetFormat, InputPaths: hdfs://sandbox.hortonworks.com:8020/user/stephenp/data.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<F1:int,Orig:struct<F1:string,F2:string>>

并且执行过程中产生的INFO日志也证实了Orig.F2列被意外读取:

16/10/21 15:13:15 INFO parquet.ParquetReadSupport: Going to read the following fields from the Parquet file:

Parquet form:
message spark_schema 
  optional int32 F1;
  optional group Orig 
    optional binary F1 (UTF8);
    optional binary F2 (UTF8);
  


Catalyst form:
StructType(StructField(F1,IntegerType,true), StructField(Orig,StructType(StructField(F1,StringType,true), StructField(F2,StringType,true)),true))

根据Dremel paper 和Parquet documentation,复杂嵌套结构的列应该独立存储和独立检索。

问题:

    此行为是否是当前 Spark 查询引擎的限制?换句话说,Parquet 是否支持优化执行此查询,但 Spark 的查询计划器很幼稚? 或者,这是当前 Parquet 实现的限制吗? 或者,我没有正确使用 Spark API? 或者,我是否误解了 Dremel/Parquet 列存储的工作原理?

可能相关:Why does the query performance differ with nested columns in Spark SQL?

【问题讨论】:

这是 Spark 查询引擎的问题。 @LostInOverflow,你知道这是否在 Spark 问题跟踪器中吗? issues.apache.org/jira/browse/SPARK/… 根据@julien-le-dem twitter.com/J_/status/789584704169123841,Parquet 似乎应该支持这种情况@ github.com/apache/spark/pull/16578 解决了这个问题。 更新。之前的 PR 已经关闭,没有解决,新的简化 PR 已经打开。在此处跟踪新的:github.com/apache/spark/pull/21320 【参考方案1】:

目前是Spark查询引擎的一个限制,相关的JIRA票据如下,spark只处理Parquet中简单类型的谓词下推,不处理嵌套的StructTypes

https://issues.apache.org/jira/browse/SPARK-17636

【讨论】:

谓词下推的限制不应影响预测。问题可能相关但不相同。 对不起,我在回答中使用了谓词这个词,但链接的 JIRA 票证标题是“Parquet filter push down does not handle struct fields” 不确定这是不是答案,但会看看。 OP 没有过滤器,因此不应应用谓词下推。

以上是关于为啥 Apache Spark 会读取嵌套结构中不必要的 Parquet 列?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Apache Spark 和 Scala 创建嵌套 json

为啥我们需要 kafka 向 apache spark 提供数据

Spark Streaming:Spark Structured Streaming 中不允许使用 Kafka 组 ID

缓存嵌套列时,Spark 是不是优化存储

为啥 spark 作业服务器中不支持带有 namedObject 的 sparkSession?

为啥嵌套循环在 laravel 中不起作用