Presto系列 | 四Presto Query Planner And Optimizer

Posted 雨钓Moowei

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Presto系列 | 四Presto Query Planner And Optimizer相关的知识,希望对你有一定的参考价值。

在深入研究Presto查询规划器和基于成本的优化如何工作之前,让我们先建立一个查询,并针对这个查询进行分析,以帮助理解查询规划的过程。

实例使用了TPC-H数据集,目的是汇总每个nation的所有order的totalprice值并列出排名前五的。

-- 实例一:
SELECT
    (SELECT name FROM region r WHERE regionkey = n.regionkey) AS region_name,
    n.name AS nation_name,
    sum(totalprice) orders_sum
FROM nation n, orders o, customer c
WHERE n.nationkey = c.nationkey
    AND c.custkey = o.custkey
GROUP BY n.nationkey, regionkey, n.name
ORDER BY orders_sum DESC
LIMIT 5;

如上SQL所示:子查询的目的是从region表中提取region_name

一、Parsing and Analysis

在计划执行之前,需要对其进行转化和分析,Presto首先会根据语法规则校验SQL文本,之后就是对查询进行分析:

1.1、确认查询中的Tables

Presto中的表是根据catalogs+Schemas进行组织的,二者确保唯一,因此不同的表Schema下可以具有相同名称的表,例如,TPC-H数据中就有多个表名为orders的表,但是他们在不同的Schema下面,如:sf10.orders 以及 sf100.orders

1.2、标识查询中使用的colums

如SQL中所示,orders.totalprice即明确的引用了order表中的totalprice 列,当SQL中涉及的表里没有相同字段时,通常直接写Column名就可以,Presto Analyzer会自动确定Column来自哪个表。

1.3、确定ROW中Field的引用

单纯的给出一个表达式,如:c.bonus;它的含义可能表示c表中的bonus列,但也可能是指一个复杂类型的列的名字为C,且C中一个字段为bonus,如何区分主要由Presto决定,但如果有冲突发生时,优先按第一种情况处理。 解析过程会遵循SQL语言的作用域和可见性规则, 也会收集一些信息,如标识符消歧,这些收集到的信息稍后会在查询计划规划的过程中使用, 这样Planner 就不需要再次理解理解查询语言的规则,避免重复工作。

Query Analyzer具有复杂的功能, 它的实现是非常有技术性的。对于用户来说,只要用户输入的查询是正确的,那么Query Analyzer对用户就是透明的,只有当查询违反了SQL语法,超过用户权限或由于其他原因导致错误时,Query Analyzer才会主动提示用户;

一旦分析完成,处理并解析了查询中的所有标识符,Presto进入下一个阶段:Query Planning

二、Initial Query Planning

Query Planning可以看做是获取查询结果的流程,需要注意的是SQL是一种声明式的语言,即用户编写一个SQL来指定他们希望从系统获得的数据。 这与命令式程序有很大的不同,命令式程序通常需要指定如何处理数据;而使用SQL时,用户不指定如何处理数据以获得结果,这些步骤和顺序留给Query PlannerOptimizer**来确定。

这一系列步骤通常称为Query Plan。理论上,很多的不同的Query Planning可以产生相同的查询结果,但彼此的性能可能会相差很大,这就是为什么Presto planner和Optimizer总是试图确定最优计划的原因。通常我们将那些可以产生相同执行结果的计划称为:equivalent plans

让我们考虑本文最开始提到的那个SQL,关于这个SQL最简单的查询计划就是按照SQL查询语法结构进行规划,如下所示, 执行计划就是一棵树,它的执行从叶子节点开始,沿着树结构向上进行。

- Limit[5]
    - Sort[orders_sum DESC]
        - LateralJoin[2]
            - Aggregate[by nationkey...; orders_sum := sum(totalprice)]
                - Filter[c.nationkey = n.nationkey AND c.custkey = o.custkey]
                    - CrossJoin
                        - CrossJoin
                            - TableScan[nation]
                            - TableScan[orders]
                        - TableScan[customer]
                - EnforceSingleRow[region_name := r.name]
                    - Filter[r.regionkey = n.regionkey]
                        - TableScan[region]

查询计划的每个元素的具体实现都很简单,例如:
TableScan:访问表的底层存储并返回一个包含该表数据的结果集。
FilTer :会过滤掉数据中的一些行,只保留满足条件的行;
CrossJoin :对来自子节点的两个数据集进行操作, 它将两个数据集中的每个行进行两两组合,也可能将其中一个数据集存储在内存中,这样就不需要多次访问底层存储。

最新的Presto版本更改了查询计划中操作的命名。例如,TableScan 修改为 ScanProject,而Filter修改为FilterProject,但相应的功能没有改变

现在让我们考虑这个查询计划的计算复杂度。在不知道所有表实际数据细节的情况下,我们无法完全把握其复杂性。但是我们可以进行如下的假设:一个查询计划节点的复杂度的下限就是他所生成数据的大小,即查询节点的复杂度与他生成的数据的行数正相关。因此我们使用Big Omega(Ω)来进行描述。如果 N,O,C以及R分别表示 nation,Orders,custoner以及region几张表里的行数,我们可以进行如下描述:

  • ***TableScan[orders]***读取order表,返回了O行数据,所以他的复杂度是:Ω(O)。同理其他两个TableScans分别返回N行和C行数据;即Ω(N) 和Ω©
  • 在 TableScan[nation]和TableSca[orders]之上的CrossJoin 对来自nation和orders表的数据进行合并,他的复杂度是:Ω(N × O)
  • 在上一层的CrossJoin将读取customer数据的TableScan[Customer]和上一个复杂度为Ω(N × O)的CrossJoin的数据进行合并,复杂度为:Ω(N × O × C).
  • 位于底层的TableScan[region]复杂度为:Ω®。但是由于LateralJoin(也就是SELECT中的子查询)他被调用N次,N就是Aggregate返回的行数,所以他的复杂度是:Ω(R × N)
  • Sort操作需要对N行进行排序因此他花费的时间不能少于 N × log(N)(注:以mergeSort算法复杂度为为准)

如果暂时不考虑其他成本,执行计划的消耗至少是:Ω[N + O + C + (N × O)+ (N × O × C) + (R × N) + (N × log(N))]
在不知相对表大小的情况下可以将其简化为 Ω[(N × O × C) + (R × N) + (N × log(N))]
按照一般经验,region和nation通常很小,如果我们假设,region是最小的表,并且nation是第二小的表,那么我们可以忽略结果的第二部分和第三部分得到最终结果:Ω(N × O × C)

代数公式讲的差不多了,是时候看看这在实践中意味着什么了,让我们举个例子,一个广受欢迎的购物网站有来自200个nations的1亿用户,他们总共下了10亿份orders。那么这两个表的CrossJoin需要(20,000,000,000,000,000,000)行数据。 对于一个健壮的拥有100节点的中等集群,每个节点每秒处理100万行, 那么计算该查询对应的中间数据将花费63个世纪。

当然,Presto肯定不会去执行这样一个不切实际的计划。不过一个幼稚的计划也有他的作用。这个初版的执行计划可以作为SQL语法和查询优化二者之前的桥梁。 查询优化的作用是将初始计划转换为一个与之等效的计划,且转化后的计划可以在Presto集群资源有限的情况下尽可能快地完成执行任务,至少在合理的时间内完成执行任务。

三、Optimization Rules

接下来讨论一下查询优化是如何达到这个目标的。

3.1、Predicate Pushdown

Predicate pushdown 即所谓的谓词下推,他可能是最重要也是最容易理解的优化策略,它的做法是尽可能的将过滤条件靠近数据源,使得在执行查询之前尽可能的过滤掉无用的数据。针对上面的例子如果应用该优化策略的话,结果如下所示:
之前的执行计划(一部分:)

...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
    - Filter[c.nationkey = n.nationkey AND c.custkey = o.custkey] // original filter
        - CrossJoin
            - CrossJoin
                - TableScan[nation]
                - TableScan[orders]
            - TableScan[customer]
...

优化后的执行计划:

...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
    - Filter[c.nationkey = n.nationkey] // transformed simpler filter
        - InnerJoin[o.custkey = c.custkey] // added inner join
            - CrossJoin
                - TableScan[nation]
                - TableScan[orders]
            - TableScan[customer]
...

即在不改变表关联前后关系的基础上,将之前的Filter转化为更为简单的Filter 同时将大的CrossJoin转化为InnerJoin,且前后两个执行计划是等效的(即上文提到的equivalent plans),如果假设这样的JOIN可以在分布式系统中实现,我们依然按照之前的约定:用生成数据的行数表示计算复杂度。那么结果就是该优化策略将之前复杂度为Ω(N × O × C)的CrossJoin替换成了复杂度为Ω(N × O)的JOIN

如上所示,谓词下推只替换可一个CrossJoin,并没有对nation表和orders表之间的CrossJoin进行替换,主要是因为nation和orders表之间没有关联条件,只能使用CrossJoin,那该如何消除这个CrossJoin呢,这就要使用 Cross Join Elimination 优化策略了。

3.2、Cross Join Elimination

也许有人会疑问,既然nation和orders表之间没有关联条件,才导致两个表关联只能使用CrossJoin,那为什么上面的执行计划要先将没有关联条件nation和orders表进行关联?

这主要是因为在没有基于成本的优化器(即cost-based optimizer下文会讲到)时,在ELECT的SQL中,Presto通常按照Table出现的前后顺序安排表间的JOIN顺序。所以才会出现上面的情况。

事实上,在大部分情况下,CrossJoin都不是必须的,都可以进行优化,因为基本都会对CrossJoin之后的数据进行过滤,只获取满足条件的数据。但CrossJoin代价是很大的,有可能永远也无法执行完;

Cross Join Elimination的目的就是对表之间的JOIN顺序进行重新排列,以减少CrossJoin的数量,最理想的情况是没有CrossJoin。在不清楚相关表大小的情况下,如果没有cross join elimination,那就需要用户在写SQL时进行控制,注意表的顺序。使用cross join elimination前后的结果如下所示:

...
    - Aggregate[by nationkey...; orders_sum := sum(totalprice)]
        - Filter[c.nationkey = n.nationkey] // filter on nationkey first
            - InnerJoin[o.custkey = c.custkey] // then inner join cutkey
                - CrossJoin
                    - TableScan[nation]
                    - TableScan[orders]
                - TableScan[customer]
...

使用cross join elimination之后:即先Join nation和customer,之后在JOIN orders

...
    - Aggregate[by nationkey...; orders_sum := sum(totalprice)]
        - InnerJoin[c.custkey = o.custkey] // reordered to custkey first
            - InnerJoin[n.nationkey = c.nationkey] // then nationkey
                - TableScan[nation]
                - TableScan[customer]
            - TableScan[orders]
...

3.3、TopN

通常情况下,如果SQL中有LIMIT,它的前面也会有Order BY子句;因为如果没有ORDER 子句,SQL不会保证返回哪些行。正如文章开头提到的查询中我们在LIMIT之前也使用了ORDER BY;

当执行这样的查询时,Presto会对所有结果数据进行排序并返回前几行数据。这种方法的复杂度为 Ω(row_count × log(row_count)) 同时内存占用为 Ω(row_count)

然而如果仅仅是为了获取排序之后的前几的数据,却需要保留所有已排序的数据,这是一种浪费。因此一种优化规则是,将后面带有LIMIT的ORDER BY查询转化为TopN, 在查询执行期间,TopN在堆数据结构中保存所需的行,流式的读取数据并更新堆数据。这使得计算复杂度降低到 Ω(row_count × log(limit)) 并且内存占用为 Ω(limit) ,总体的查询成本为: Ω[O + (R × N) + N]

3.4、Partial Aggregations

Presto不需要将orders表中的所有行传递给join,因为我们对单个订单不感兴趣,我们的示例SQL中是要计算每一个nation的totalprice的汇总,因此可以进行预聚合,如下所示;

...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
    - InnerJoin[c.custkey = o.custkey]
        - InnerJoin[n.nationkey = c.nationkey]
            - TableScan[nation]
            - TableScan[customer]
        - Aggregate[by custkey; totalprice := sum(totalprice)]
            - TableScan[orders]
...

我们通过预聚合数据来减少流向下游的数据量,预聚合的结果是不完整的,但数据量会显著的的减少,从而提升性能;

为了提高并行性,这种预聚合的实现方式是不同的,该方式被称为:Partial Aggregations 。 这里,我们呈现的是简化的计划,但是在实际的EXPLAIN计划中,这与最终的汇总会有所不同

需要注意的是,如上所示的预聚合并汇总是可以实现优化的,但如果预聚合不能减少数据量时,查询性能将会受到影响。出于该原因, 目前该优化在默认情况下是禁用的,可以通过session中的 push_partial_aggregation_through_join 切换启用。默认情况下,会将预聚合放在JOIN上以减少Presto中节点间的数据传输量, 为了更有效的利用Partial Aggregations的优势,我们需要充分考虑实际情况。

四、Implementation Rules

到目前为止,我们介绍的规则都是优化规则,这些规则的目标是减少查询处理时间减少查询的内存占用减少通过网络交换的数据量。但是上面的示例SQL还包含一个我们一直没有提到的操作:lateral join

4.1、Lateral Join Decorrelation

lateral join 类似一个for-each循环,他遍历数据集中的所有行并针对每一行执行相应的查询,但是Presto并非这样处理的。相反,Presto会将其转换为一个left join,用SQL表示如下:
原始的SQL

SELECT
    (SELECT name FROM region r WHERE regionkey = n.regionkey) AS region_name,
    n.name AS nation_name
FROM nation n

转化后的SQL:

SELECT
    r.name AS region_name,
     n.name AS nation_name
FROM nation n
LEFT OUTER JOIN region r ON r.regionkey = n.regionkey

但是需要注意的是,二者并非完全等价的。因为在第一个SQL中,当region表的regionkey存在重复数据时,查询会出错(只有当region表中regionkey字段唯一时才可以有效执行)。但是第二个查询在此情况下可以正常执行且不会失败,而是生成多行数据,正因如此lateral join会在转换时添加两个额外的条件:首先,他对所有的数据行进行编号,以便于区分;其次,在连接之后会检测是否有重复行,如果存在重复行,那么查询将失败,以保证转换后的SQL与之前的语义完全一致。如以下示例中所示:

- TopN[5; orders_sum DESC]
    - MarkDistinct & Check
        - LeftJoin[n.regionkey = r.regionkey]
            - AssignUniqueId
                - Aggregate[by nationkey...; orders_sum := sum(totalprice)]
                    - ...
            - TableScan[region]

4.2、Semi-Join (IN) Decorrelation

正如文章开头提到的SQL所示,可以在查询中使用子查询,这样不仅可以提取所需的信息(正如我们在 lateral join示例中看到的那样),还可以使用IN谓词过滤行。事实上,IN可以在Where子句中使用,也可以在SELECT子句中使用,当你在SELECT中使用IN时,并不是简单的Boolean值的操作,这与EXISTS有很大的不同,相反,IN可以计算为true、false、或者 null

让我们考虑这样一个查询,他的目的是查找来自同一国家的客户(customer表)和产品供应商(supplier表)的订单。SQL如下:

SELECT DISTINCT o.orderkey
FROM lineitem l
JOIN orders o ON o.orderkey = l.orderkey
JOIN customer c ON o.custkey = c.custkey
WHERE c.nationkey IN (
    -- subquery invoked multiple times
    SELECT s.nationkey
    FROM part p
    JOIN partsupp ps ON p.partkey = ps.partkey
    JOIN supplier s ON ps.suppkey = s.suppkey
    WHERE p.partkey = l.partkey
);

与lateral join一样,这可以通过循环执行子查询来实现,其中将多次调用子查询来检索所有suppliers的国家。

Presto没有这样做,相反在Presto的实现中,子查询只计算一次,并且将子查询中的关联条件去掉,而是通过关联条件将子查询与外部查询进行JOIN。
这种处理实现的难点是不要产生多个结果(这就要使用deduplicating aggregation),并且正确地保留了IN语法的三值逻辑(即IN值可以是true、false、null)。

在这种情况下,deduplicating aggregation使用与JOIN相同的分区,因此可以以流式的执行,无需通过网络进行数据交换,占用的内存最少。

以上是关于Presto系列 | 四Presto Query Planner And Optimizer的主要内容,如果未能解决你的问题,请参考以下文章

Presto系列 | 五Tuning Presto SQL Query

Presto系列 | 五Tuning Presto SQL Query

presto .vs impala .vs HAWQ query engine

Presto + query.max-memory-per-node 配置

用于 In Query 的 Athena/Presto 拆分字符串

Presto系列 | 三Presto Architecture