Presto系列 | 四Presto Query Planner And Optimizer
Posted 雨钓Moowei
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Presto系列 | 四Presto Query Planner And Optimizer相关的知识,希望对你有一定的参考价值。
在深入研究Presto查询规划器和基于成本的优化如何工作之前,让我们先建立一个查询,并针对这个查询进行分析,以帮助理解查询规划的过程。
实例使用了TPC-H数据集,目的是汇总每个nation的所有order的totalprice值并列出排名前五的。
-- 实例一:
SELECT
(SELECT name FROM region r WHERE regionkey = n.regionkey) AS region_name,
n.name AS nation_name,
sum(totalprice) orders_sum
FROM nation n, orders o, customer c
WHERE n.nationkey = c.nationkey
AND c.custkey = o.custkey
GROUP BY n.nationkey, regionkey, n.name
ORDER BY orders_sum DESC
LIMIT 5;
如上SQL所示:子查询的目的是从region表中提取region_name
一、Parsing and Analysis
在计划执行之前,需要对其进行转化和分析,Presto首先会根据语法规则校验SQL文本,之后就是对查询进行分析:
1.1、确认查询中的Tables
Presto中的表是根据catalogs+Schemas进行组织的,二者确保唯一,因此不同的表Schema下可以具有相同名称的表,例如,TPC-H数据中就有多个表名为orders的表,但是他们在不同的Schema下面,如:sf10.orders 以及 sf100.orders
1.2、标识查询中使用的colums
如SQL中所示,orders.totalprice即明确的引用了order表中的totalprice 列,当SQL中涉及的表里没有相同字段时,通常直接写Column名就可以,Presto Analyzer会自动确定Column来自哪个表。
1.3、确定ROW中Field的引用
单纯的给出一个表达式,如:c.bonus;它的含义可能表示c表中的bonus列,但也可能是指一个复杂类型的列的名字为C,且C中一个字段为bonus,如何区分主要由Presto决定,但如果有冲突发生时,优先按第一种情况处理。 解析过程会遵循SQL语言的作用域和可见性规则, 也会收集一些信息,如标识符消歧,这些收集到的信息稍后会在查询计划规划的过程中使用, 这样Planner 就不需要再次理解理解查询语言的规则,避免重复工作。
Query Analyzer具有复杂的功能, 它的实现是非常有技术性的。对于用户来说,只要用户输入的查询是正确的,那么Query Analyzer对用户就是透明的,只有当查询违反了SQL语法,超过用户权限或由于其他原因导致错误时,Query Analyzer才会主动提示用户;
一旦分析完成,处理并解析了查询中的所有标识符,Presto进入下一个阶段:Query Planning
二、Initial Query Planning
Query Planning可以看做是获取查询结果的流程,需要注意的是SQL是一种声明式的语言,即用户编写一个SQL来指定他们希望从系统获得的数据。 这与命令式程序有很大的不同,命令式程序通常需要指定如何处理数据;而使用SQL时,用户不指定如何处理数据以获得结果,这些步骤和顺序留给Query Planner和Optimizer**来确定。
这一系列步骤通常称为Query Plan。理论上,很多的不同的Query Planning可以产生相同的查询结果,但彼此的性能可能会相差很大,这就是为什么Presto planner和Optimizer总是试图确定最优计划的原因。通常我们将那些可以产生相同执行结果的计划称为:equivalent plans
让我们考虑本文最开始提到的那个SQL,关于这个SQL最简单的查询计划就是按照SQL查询语法结构进行规划,如下所示, 执行计划就是一棵树,它的执行从叶子节点开始,沿着树结构向上进行。
- Limit[5]
- Sort[orders_sum DESC]
- LateralJoin[2]
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey AND c.custkey = o.custkey]
- CrossJoin
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]
- EnforceSingleRow[region_name := r.name]
- Filter[r.regionkey = n.regionkey]
- TableScan[region]
查询计划的每个元素的具体实现都很简单,例如:
TableScan:访问表的底层存储并返回一个包含该表数据的结果集。
FilTer :会过滤掉数据中的一些行,只保留满足条件的行;
CrossJoin :对来自子节点的两个数据集进行操作, 它将两个数据集中的每个行进行两两组合,也可能将其中一个数据集存储在内存中,这样就不需要多次访问底层存储。
最新的Presto版本更改了查询计划中操作的命名。例如,TableScan 修改为 ScanProject,而Filter修改为FilterProject,但相应的功能没有改变。
现在让我们考虑这个查询计划的计算复杂度。在不知道所有表实际数据细节的情况下,我们无法完全把握其复杂性。但是我们可以进行如下的假设:一个查询计划节点的复杂度的下限就是他所生成数据的大小,即查询节点的复杂度与他生成的数据的行数正相关。因此我们使用Big Omega(Ω)来进行描述。如果 N,O,C以及R分别表示 nation,Orders,custoner以及region几张表里的行数,我们可以进行如下描述:
- ***TableScan[orders]***读取order表,返回了O行数据,所以他的复杂度是:Ω(O)。同理其他两个TableScans分别返回N行和C行数据;即Ω(N) 和Ω©
- 在 TableScan[nation]和TableSca[orders]之上的CrossJoin 对来自nation和orders表的数据进行合并,他的复杂度是:Ω(N × O)
- 在上一层的CrossJoin将读取customer数据的TableScan[Customer]和上一个复杂度为Ω(N × O)的CrossJoin的数据进行合并,复杂度为:Ω(N × O × C).
- 位于底层的TableScan[region]复杂度为:Ω®。但是由于LateralJoin(也就是SELECT中的子查询)他被调用N次,N就是Aggregate返回的行数,所以他的复杂度是:Ω(R × N)
- Sort操作需要对N行进行排序因此他花费的时间不能少于 N × log(N)(注:以mergeSort算法复杂度为为准)
如果暂时不考虑其他成本,执行计划的消耗至少是:Ω[N + O + C + (N × O)+ (N × O × C) + (R × N) + (N × log(N))]
在不知相对表大小的情况下可以将其简化为 Ω[(N × O × C) + (R × N) + (N × log(N))]
按照一般经验,region和nation通常很小,如果我们假设,region是最小的表,并且nation是第二小的表,那么我们可以忽略结果的第二部分和第三部分得到最终结果:Ω(N × O × C)
代数公式讲的差不多了,是时候看看这在实践中意味着什么了,让我们举个例子,一个广受欢迎的购物网站有来自200个nations的1亿用户,他们总共下了10亿份orders。那么这两个表的CrossJoin需要(20,000,000,000,000,000,000)行数据。 对于一个健壮的拥有100节点的中等集群,每个节点每秒处理100万行, 那么计算该查询对应的中间数据将花费63个世纪。
当然,Presto肯定不会去执行这样一个不切实际的计划。不过一个幼稚的计划也有他的作用。这个初版的执行计划可以作为SQL语法和查询优化二者之前的桥梁。 查询优化的作用是将初始计划转换为一个与之等效的计划,且转化后的计划可以在Presto集群资源有限的情况下尽可能快地完成执行任务,至少在合理的时间内完成执行任务。
三、Optimization Rules
接下来讨论一下查询优化是如何达到这个目标的。
3.1、Predicate Pushdown
Predicate pushdown 即所谓的谓词下推,他可能是最重要也是最容易理解的优化策略,它的做法是尽可能的将过滤条件靠近数据源,使得在执行查询之前尽可能的过滤掉无用的数据。针对上面的例子如果应用该优化策略的话,结果如下所示:
之前的执行计划(一部分:)
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey AND c.custkey = o.custkey] // original filter
- CrossJoin
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]
...
优化后的执行计划:
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey] // transformed simpler filter
- InnerJoin[o.custkey = c.custkey] // added inner join
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]
...
即在不改变表关联前后关系的基础上,将之前的Filter转化为更为简单的Filter 同时将大的CrossJoin转化为InnerJoin,且前后两个执行计划是等效的(即上文提到的equivalent plans),如果假设这样的JOIN可以在分布式系统中实现,我们依然按照之前的约定:用生成数据的行数表示计算复杂度。那么结果就是该优化策略将之前复杂度为Ω(N × O × C)的CrossJoin替换成了复杂度为Ω(N × O)的JOIN
如上所示,谓词下推只替换可一个CrossJoin,并没有对nation表和orders表之间的CrossJoin进行替换,主要是因为nation和orders表之间没有关联条件,只能使用CrossJoin,那该如何消除这个CrossJoin呢,这就要使用 Cross Join Elimination 优化策略了。
3.2、Cross Join Elimination
也许有人会疑问,既然nation和orders表之间没有关联条件,才导致两个表关联只能使用CrossJoin,那为什么上面的执行计划要先将没有关联条件nation和orders表进行关联?
这主要是因为在没有基于成本的优化器(即cost-based optimizer下文会讲到)时,在ELECT的SQL中,Presto通常按照Table出现的前后顺序安排表间的JOIN顺序。所以才会出现上面的情况。
事实上,在大部分情况下,CrossJoin都不是必须的,都可以进行优化,因为基本都会对CrossJoin之后的数据进行过滤,只获取满足条件的数据。但CrossJoin代价是很大的,有可能永远也无法执行完;
Cross Join Elimination的目的就是对表之间的JOIN顺序进行重新排列,以减少CrossJoin的数量,最理想的情况是没有CrossJoin。在不清楚相关表大小的情况下,如果没有cross join elimination,那就需要用户在写SQL时进行控制,注意表的顺序。使用cross join elimination前后的结果如下所示:
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey] // filter on nationkey first
- InnerJoin[o.custkey = c.custkey] // then inner join cutkey
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]
...
使用cross join elimination之后:即先Join nation和customer,之后在JOIN orders
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- InnerJoin[c.custkey = o.custkey] // reordered to custkey first
- InnerJoin[n.nationkey = c.nationkey] // then nationkey
- TableScan[nation]
- TableScan[customer]
- TableScan[orders]
...
3.3、TopN
通常情况下,如果SQL中有LIMIT,它的前面也会有Order BY子句;因为如果没有ORDER 子句,SQL不会保证返回哪些行。正如文章开头提到的查询中我们在LIMIT之前也使用了ORDER BY;
当执行这样的查询时,Presto会对所有结果数据进行排序并返回前几行数据。这种方法的复杂度为 Ω(row_count × log(row_count)) 同时内存占用为 Ω(row_count) ;
然而如果仅仅是为了获取排序之后的前几的数据,却需要保留所有已排序的数据,这是一种浪费。因此一种优化规则是,将后面带有LIMIT的ORDER BY查询转化为TopN, 在查询执行期间,TopN在堆数据结构中保存所需的行,流式的读取数据并更新堆数据。这使得计算复杂度降低到 Ω(row_count × log(limit)) 并且内存占用为 Ω(limit) ,总体的查询成本为: Ω[O + (R × N) + N] 。
3.4、Partial Aggregations
Presto不需要将orders表中的所有行传递给join,因为我们对单个订单不感兴趣,我们的示例SQL中是要计算每一个nation的totalprice的汇总,因此可以进行预聚合,如下所示;
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- InnerJoin[c.custkey = o.custkey]
- InnerJoin[n.nationkey = c.nationkey]
- TableScan[nation]
- TableScan[customer]
- Aggregate[by custkey; totalprice := sum(totalprice)]
- TableScan[orders]
...
我们通过预聚合数据来减少流向下游的数据量,预聚合的结果是不完整的,但数据量会显著的的减少,从而提升性能;
为了提高并行性,这种预聚合的实现方式是不同的,该方式被称为:Partial Aggregations 。 这里,我们呈现的是简化的计划,但是在实际的EXPLAIN计划中,这与最终的汇总会有所不同
需要注意的是,如上所示的预聚合并汇总是可以实现优化的,但如果预聚合不能减少数据量时,查询性能将会受到影响。出于该原因, 目前该优化在默认情况下是禁用的,可以通过session中的 push_partial_aggregation_through_join 切换启用。默认情况下,会将预聚合放在JOIN上以减少Presto中节点间的数据传输量, 为了更有效的利用Partial Aggregations的优势,我们需要充分考虑实际情况。
四、Implementation Rules
到目前为止,我们介绍的规则都是优化规则,这些规则的目标是减少查询处理时间、减少查询的内存占用或减少通过网络交换的数据量。但是上面的示例SQL还包含一个我们一直没有提到的操作:lateral join ;
4.1、Lateral Join Decorrelation
lateral join 类似一个for-each循环,他遍历数据集中的所有行并针对每一行执行相应的查询,但是Presto并非这样处理的。相反,Presto会将其转换为一个left join,用SQL表示如下:
原始的SQL
SELECT
(SELECT name FROM region r WHERE regionkey = n.regionkey) AS region_name,
n.name AS nation_name
FROM nation n
转化后的SQL:
SELECT
r.name AS region_name,
n.name AS nation_name
FROM nation n
LEFT OUTER JOIN region r ON r.regionkey = n.regionkey
但是需要注意的是,二者并非完全等价的。因为在第一个SQL中,当region表的regionkey存在重复数据时,查询会出错(只有当region表中regionkey字段唯一时才可以有效执行)。但是第二个查询在此情况下可以正常执行且不会失败,而是生成多行数据,正因如此lateral join会在转换时添加两个额外的条件:首先,他对所有的数据行进行编号,以便于区分;其次,在连接之后会检测是否有重复行,如果存在重复行,那么查询将失败,以保证转换后的SQL与之前的语义完全一致。如以下示例中所示:
- TopN[5; orders_sum DESC]
- MarkDistinct & Check
- LeftJoin[n.regionkey = r.regionkey]
- AssignUniqueId
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- ...
- TableScan[region]
4.2、Semi-Join (IN) Decorrelation
正如文章开头提到的SQL所示,可以在查询中使用子查询,这样不仅可以提取所需的信息(正如我们在 lateral join示例中看到的那样),还可以使用IN谓词过滤行。事实上,IN可以在Where子句中使用,也可以在SELECT子句中使用,当你在SELECT中使用IN时,并不是简单的Boolean值的操作,这与EXISTS有很大的不同,相反,IN可以计算为true、false、或者 null
让我们考虑这样一个查询,他的目的是查找来自同一国家的客户(customer表)和产品供应商(supplier表)的订单。SQL如下:
SELECT DISTINCT o.orderkey
FROM lineitem l
JOIN orders o ON o.orderkey = l.orderkey
JOIN customer c ON o.custkey = c.custkey
WHERE c.nationkey IN (
-- subquery invoked multiple times
SELECT s.nationkey
FROM part p
JOIN partsupp ps ON p.partkey = ps.partkey
JOIN supplier s ON ps.suppkey = s.suppkey
WHERE p.partkey = l.partkey
);
与lateral join一样,这可以通过循环执行子查询来实现,其中将多次调用子查询来检索所有suppliers的国家。
Presto没有这样做,相反在Presto的实现中,子查询只计算一次,并且将子查询中的关联条件去掉,而是通过关联条件将子查询与外部查询进行JOIN。
这种处理实现的难点是不要产生多个结果(这就要使用deduplicating aggregation),并且正确地保留了IN语法的三值逻辑(即IN值可以是true、false、null)。
在这种情况下,deduplicating aggregation使用与JOIN相同的分区,因此可以以流式的执行,无需通过网络进行数据交换,占用的内存最少。
以上是关于Presto系列 | 四Presto Query Planner And Optimizer的主要内容,如果未能解决你的问题,请参考以下文章
Presto系列 | 五Tuning Presto SQL Query
Presto系列 | 五Tuning Presto SQL Query
presto .vs impala .vs HAWQ query engine
Presto + query.max-memory-per-node 配置