理解 Spark 解释:Collect vs Global vs Local Limit

Posted

技术标签:

【中文标题】理解 Spark 解释:Collect vs Global vs Local Limit【英文标题】:Understanding Spark Explain: Collect vs Global vs Local Limit 【发布时间】:2019-05-25 02:52:05 【问题描述】:

我想看看在 Spark/AWS Glue 中做限制之间的区别

我尝试使用 Spark SQL

spark.sql("SELECT * FROM flights LIMIT 10")

解释看起来像:

CollectLimit 10
+- *FileScan parquet xxxxxx.flights[Id#31,...] Batched: true, Format: Parquet, Location: CatalogFileIndex[s3://xxxxxx/flights], PartitionCount: 14509, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<...

然后我尝试使用 AWS Glue 数据目录来查看它是否更快

gdf = glueContext.create_dynamic_frame.from_catalog(database = "xxxxxx", table_name = "xxxxxx")
df = gdf.toDF()
df = df.limit(10)

df.explain(True)

df.show(10)

解释如下:

GlobalLimit 10
+- LocalLimit 10
+- LogicalRDD [Id#70, ...]

第一次在 5 分钟内运行,第二次在 4 分钟内运行,还不是很重要,但我认为查询数据目录更快或在数据帧中进行限制似乎比在 spark SQL 中进行限制更好?

收集与全局与本地限制之间有什么区别?我猜本地限制意味着它确实在本地限制,然后驱动程序将执行全局限制以给出最终结果。但是为什么 Spark SQL 不做这个优化呢?

Spark 会在做任何限制之前读取所有底层 parquet 文件吗?在这个例子中,有没有办法告诉 spark 读取直到它只有 10 行?

【问题讨论】:

【参考方案1】:
    SQL 方式,程序化数据集创建 - 两种情况下的控制流程相同,都通过 Spark SQL 催化剂。在您的情况下,当第一次运行查询时,它从元存储中获取有关表的元数据并将其缓存,在随后的查询中,它被重用,这可能是第一次查询速度慢的原因。李> 没有LogicalPlan节点作为CollectLimit,只有CollectLimitExec物理计划节点。而limit 实现为LocalLimit 后跟GlobalLimit(link to code) Spark 以增量方式执行limit。 它尝试使用一个分区检索给定数量的行。 如果不满足行数,Spark 会查询接下来的 4 个分区(由spark.sql.limit.scaleUpFactor 确定,默认 4 个),然后是 16 个,以此类推,直到满足限制或数据耗尽。

【讨论】:

以上是关于理解 Spark 解释:Collect vs Global vs Local Limit的主要内容,如果未能解决你的问题,请参考以下文章

Learning Spark [6] - Spark SQL高级函数

在 Spark SQL 中使用 collect_list 和 collect_set

[Spark][python]RDD的collect 作用是什么?

通过 Spark SQL 实现 `collect_list`

Pyspark spark.read.csv().collect() 返回一个空列表

spark collect获取所有元素