理解 Spark 解释:Collect vs Global vs Local Limit
Posted
技术标签:
【中文标题】理解 Spark 解释:Collect vs Global vs Local Limit【英文标题】:Understanding Spark Explain: Collect vs Global vs Local Limit 【发布时间】:2019-05-25 02:52:05 【问题描述】:我想看看在 Spark/AWS Glue 中做限制之间的区别
我尝试使用 Spark SQL
spark.sql("SELECT * FROM flights LIMIT 10")
解释看起来像:
CollectLimit 10
+- *FileScan parquet xxxxxx.flights[Id#31,...] Batched: true, Format: Parquet, Location: CatalogFileIndex[s3://xxxxxx/flights], PartitionCount: 14509, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<...
然后我尝试使用 AWS Glue 数据目录来查看它是否更快
gdf = glueContext.create_dynamic_frame.from_catalog(database = "xxxxxx", table_name = "xxxxxx")
df = gdf.toDF()
df = df.limit(10)
df.explain(True)
df.show(10)
解释如下:
GlobalLimit 10
+- LocalLimit 10
+- LogicalRDD [Id#70, ...]
第一次在 5 分钟内运行,第二次在 4 分钟内运行,还不是很重要,但我认为查询数据目录更快或在数据帧中进行限制似乎比在 spark SQL 中进行限制更好?
收集与全局与本地限制之间有什么区别?我猜本地限制意味着它确实在本地限制,然后驱动程序将执行全局限制以给出最终结果。但是为什么 Spark SQL 不做这个优化呢?
Spark 会在做任何限制之前读取所有底层 parquet 文件吗?在这个例子中,有没有办法告诉 spark 读取直到它只有 10 行?
【问题讨论】:
【参考方案1】:-
SQL 方式,程序化数据集创建 - 两种情况下的控制流程相同,都通过 Spark SQL 催化剂。在您的情况下,当第一次运行查询时,它从元存储中获取有关表的元数据并将其缓存,在随后的查询中,它被重用,这可能是第一次查询速度慢的原因。李>
没有
LogicalPlan
节点作为CollectLimit
,只有CollectLimitExec
物理计划节点。而limit
实现为LocalLimit
后跟GlobalLimit
(link to code)
Spark 以增量方式执行limit
。
它尝试使用一个分区检索给定数量的行。
如果不满足行数,Spark 会查询接下来的 4 个分区(由spark.sql.limit.scaleUpFactor
确定,默认 4 个),然后是 16 个,以此类推,直到满足限制或数据耗尽。
【讨论】:
以上是关于理解 Spark 解释:Collect vs Global vs Local Limit的主要内容,如果未能解决你的问题,请参考以下文章
Learning Spark [6] - Spark SQL高级函数
在 Spark SQL 中使用 collect_list 和 collect_set
[Spark][python]RDD的collect 作用是什么?
通过 Spark SQL 实现 `collect_list`