SparkSQL 子查询和性能

Posted

技术标签:

【中文标题】SparkSQL 子查询和性能【英文标题】:SparkSQL subquery and performance 【发布时间】:2018-11-11 07:50:54 【问题描述】:

为了允许系统用户动态创建(通过应用程序 Web UI)带有辅助数据的不同数据字典,我使用 DataFrames 并将它们公开为临时表,例如:

Seq("Italy", "France", "United States", "Spain").toDF("country").createOrReplaceTempView("big_countries")
Seq("Poland", "Hungary", "Spain").toDF("country").createOrReplaceTempView("medium_countries")

这些词典的数量只受用户想象力和业务需求的限制。

之后用户还可以创建不同的查询,这些查询可能会根据之前定义的辅助数据使用条件,例如 SQL WHERE 条件:

Q1: country IN (FROM medium_countries)
Q2: (TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL) AND phone = '+91-9111999998'
Q3: TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL
......
Qn: name = 'Donald' AND email = 'donald@example.com' AND phone = '+1-2222222222'

这些查询的数量仅受用户想象力和业务需求的限制。

我现在最担心的是像country IN (FROM medium_countries)这样的子查询

根据系统设计,我不能在这里使用显式JOIN,所以我限制使用子查询。所以我有一个问题 - 通常这些辅助数据表的大小应该相对较小......我认为最坏情况下有几千行,而这些表的总数 - 最坏情况下有几百行。考虑到这一点,这种方法是否会导致性能问题?是否存在任何可以优化流程的技术,例如将这些字典缓存在内存中等等?

更新

现在我只能在 Spark 本地模式下测试它

查询:

country IN (FROM big_countries)

执行计划:

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan                                                                                                                                                                                                                                                                                                                                                                            |tag|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) BroadcastHashJoin [country#22], [country#3], LeftSemi, BuildRight
   :- *(1) Project [country#22, unique_id#27L]
   :  +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- LocalTableScan [country#3]|big|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+

查询:

TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL

执行计划:

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |tag|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) Filter (true = (exists#53 && exists#54))
   +- *(1) BroadcastHashJoin [country#22], [country#3], ExistenceJoin(exists#54), BuildRight
      :- *(1) BroadcastHashJoin [country#22], [country#8], ExistenceJoin(exists#53), BuildRight
      :  :- *(1) Project [country#22, unique_id#27L]
      :  :  +- *(1) Filter isnotnull(EMAIL#20)
      :  :     +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
      :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      :     +- LocalTableScan [country#8]
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
         +- LocalTableScan [country#3]|big|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+

【问题讨论】:

你能发布数据帧的物理执行计划吗? Dataframe.explain 的输出。 好的,更新了问题 【参考方案1】:

我认为:

CACHE TABLE tbl  as in sql("CACHE TABLE tbl")

是你需要在你之后执行的:

...createOrReplaceTempView....

当然是在更大的查询之前。

现在在 SPARK 中,上面关于“缓存”的声明现在默认是急切的,而不是懒惰的。正如手册所述,您不再需要手动触发缓存实现。也就是说,不再需要执行 df.show 或 df.count。

一旦在内存中 - 在您明确刷新之前,不需要每次都再次获取此数据,这里看起来没有过滤,而只是加载所有有限的数据集一次。

不知道你的设计,但看着它,子查询应该没问题。尝试这种方法并查看物理计划。在传统的 RDBMS 中,这种类型的受限子查询 - 据我所知 - 也不会破坏交易。

您还可以看到,物理计划显示 Catalyst Optimizer 已将您的 IN 子查询优化/转换为 JOIN,这是大型数据集的典型性能改进。

因此,将较小的表“广播”到执行程序的工作节点也可以提高性能。根据我的观察,您可能不需要为广播设置任何限制,但您可以明确设置,但可能不需要这样做。

【讨论】:

以上是关于SparkSQL 子查询和性能的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark SQL 中的 WITH 子句中缓存子查询结果

一次sparksql问题排查记录

SparkSQL 加入父/子数据集

表连接查询与where后使用子查询的性能分析。

UDF 与子查询性能问题

使用子查询时如何提高查询性能