SparkSQL 子查询和性能
Posted
技术标签:
【中文标题】SparkSQL 子查询和性能【英文标题】:SparkSQL subquery and performance 【发布时间】:2018-11-11 07:50:54 【问题描述】:为了允许系统用户动态创建(通过应用程序 Web UI)带有辅助数据的不同数据字典,我使用 DataFrames 并将它们公开为临时表,例如:
Seq("Italy", "France", "United States", "Spain").toDF("country").createOrReplaceTempView("big_countries")
Seq("Poland", "Hungary", "Spain").toDF("country").createOrReplaceTempView("medium_countries")
这些词典的数量只受用户想象力和业务需求的限制。
之后用户还可以创建不同的查询,这些查询可能会根据之前定义的辅助数据使用条件,例如 SQL WHERE
条件:
Q1: country IN (FROM medium_countries)
Q2: (TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL) AND phone = '+91-9111999998'
Q3: TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL
......
Qn: name = 'Donald' AND email = 'donald@example.com' AND phone = '+1-2222222222'
这些查询的数量仅受用户想象力和业务需求的限制。
我现在最担心的是像country IN (FROM medium_countries)
这样的子查询
根据系统设计,我不能在这里使用显式JOIN
,所以我限制使用子查询。所以我有一个问题 - 通常这些辅助数据表的大小应该相对较小......我认为最坏情况下有几千行,而这些表的总数 - 最坏情况下有几百行。考虑到这一点,这种方法是否会导致性能问题?是否存在任何可以优化流程的技术,例如将这些字典缓存在内存中等等?
更新
现在我只能在 Spark 本地模式下测试它
查询:
country IN (FROM big_countries)
执行计划:
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) BroadcastHashJoin [country#22], [country#3], LeftSemi, BuildRight
:- *(1) Project [country#22, unique_id#27L]
: +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
查询:
TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL
执行计划:
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) Filter (true = (exists#53 && exists#54))
+- *(1) BroadcastHashJoin [country#22], [country#3], ExistenceJoin(exists#54), BuildRight
:- *(1) BroadcastHashJoin [country#22], [country#8], ExistenceJoin(exists#53), BuildRight
: :- *(1) Project [country#22, unique_id#27L]
: : +- *(1) Filter isnotnull(EMAIL#20)
: : +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
: +- LocalTableScan [country#8]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
【问题讨论】:
你能发布数据帧的物理执行计划吗? Dataframe.explain 的输出。 好的,更新了问题 【参考方案1】:我认为:
CACHE TABLE tbl as in sql("CACHE TABLE tbl")
是你需要在你之后执行的:
...createOrReplaceTempView....
当然是在更大的查询之前。
现在在 SPARK 中,上面关于“缓存”的声明现在默认是急切的,而不是懒惰的。正如手册所述,您不再需要手动触发缓存实现。也就是说,不再需要执行 df.show 或 df.count。
一旦在内存中 - 在您明确刷新之前,不需要每次都再次获取此数据,这里看起来没有过滤,而只是加载所有有限的数据集一次。
不知道你的设计,但看着它,子查询应该没问题。尝试这种方法并查看物理计划。在传统的 RDBMS 中,这种类型的受限子查询 - 据我所知 - 也不会破坏交易。
您还可以看到,物理计划显示 Catalyst Optimizer 已将您的 IN 子查询优化/转换为 JOIN,这是大型数据集的典型性能改进。
因此,将较小的表“广播”到执行程序的工作节点也可以提高性能。根据我的观察,您可能不需要为广播设置任何限制,但您可以明确设置,但可能不需要这样做。
【讨论】:
以上是关于SparkSQL 子查询和性能的主要内容,如果未能解决你的问题,请参考以下文章