Spark SQL OptimizedPlan
Posted 刘姥爷观园子
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark SQL OptimizedPlan相关的知识,希望对你有一定的参考价值。
Spark SQL(6) OptimizedPlan
在这一步spark sql主要应用一些规则,优化生成的Resolved Plan,这一步涉及到的有Optimizer。
之前介绍在sparksession实例化的是会实例化sessionState,进而确定QueryExecution、Analyzer,Optimizer也是在这一步确定的:
protected def optimizer: Optimizer = { new SparkOptimizer(catalog, experimentalMethods) { override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules } }
Optimizer也是RuleExecutor的子类,而SparkOptimizer是Optimizer子类,在analyzed步骤知道,其实主要规则就是RuleExecutor子类定义的batchs的规则
sparkOptimizer:
override def batches: Seq[Batch] = (preOptimizationBatches ++ super.batches :+ Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+ Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+ Batch("Push down operators to data source scan", Once, PushDownOperatorsToDataSource)) ++ postHocOptimizationBatches :+ Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
Optimizer:
def batches: Seq[Batch] = { val operatorOptimizationRuleSet = Seq( // Operator push down PushProjectionThroughUnion, ReorderJoin, EliminateOuterJoin, PushPredicateThroughJoin, PushDownPredicate, LimitPushDown, ColumnPruning, InferFiltersFromConstraints, // Operator combine CollapseRepartition, CollapseProject, CollapseWindow, CombineFilters, CombineLimits, CombineUnions, // Constant folding and strength reduction NullPropagation, ConstantPropagation, FoldablePropagation, OptimizeIn, ConstantFolding, ReorderAssociativeOperator, LikeSimplification, BooleanSimplification, SimplifyConditionals, RemoveDispensableExpressions, SimplifyBinaryComparison, PruneFilters, EliminateSorts, SimplifyCasts, SimplifyCaseConversionExpressions, RewriteCorrelatedScalarSubquery, EliminateSerialization, RemoveRedundantAliases, RemoveRedundantProject, SimplifyCreateStructOps, SimplifyCreateArrayOps, SimplifyCreateMapOps, CombineConcats) ++ extendedOperatorOptimizationRules val operatorOptimizationBatch: Seq[Batch] = { val rulesWithoutInferFiltersFromConstraints = operatorOptimizationRuleSet.filterNot(_ == InferFiltersFromConstraints) Batch("Operator Optimization before Inferring Filters", fixedPoint, rulesWithoutInferFiltersFromConstraints: _*) :: Batch("Infer Filters", Once, InferFiltersFromConstraints) :: Batch("Operator Optimization after Inferring Filters", fixedPoint, rulesWithoutInferFiltersFromConstraints: _*) :: Nil } (Batch("Eliminate Distinct", Once, EliminateDistinct) :: // Technically some of the rules in Finish Analysis are not optimizer rules and belong more // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). // However, because we also use the analyzer to canonicalized queries (for view definition), // we do not eliminate subqueries or compute current time in the analyzer. Batch("Finish Analysis", Once, EliminateSubqueryAliases, EliminateView, ReplaceExpressions, ComputeCurrentTime, GetCurrentDatabase(sessionCatalog), RewriteDistinctAggregates, ReplaceDeduplicateWithAggregate) :: ////////////////////////////////////////////////////////////////////////////////////////// // Optimizer rules start here ////////////////////////////////////////////////////////////////////////////////////////// // - Do the first call of CombineUnions before starting the major Optimizer rules, // since it can reduce the number of iteration and the other rules could add/move // extra operators between two adjacent Union operators. // - Call CombineUnions again in Batch("Operator Optimizations"), // since the other rules might make two separate Unions operators adjacent. Batch("Union", Once, CombineUnions) :: Batch("Pullup Correlated Expressions", Once, PullupCorrelatedPredicates) :: Batch("Subquery", Once, OptimizeSubqueries) :: Batch("Replace Operators", fixedPoint, ReplaceIntersectWithSemiJoin, ReplaceExceptWithFilter, ReplaceExceptWithAntiJoin, ReplaceDistinctWithAggregate) :: Batch("Aggregate", fixedPoint, RemoveLiteralFromGroupExpressions, RemoveRepetitionFromGroupExpressions) :: Nil ++ operatorOptimizationBatch) :+ Batch("Join Reorder", Once, CostBasedJoinReorder) :+ Batch("Decimal Optimizations", fixedPoint, DecimalAggregates) :+ Batch("Object Expressions Optimization", fixedPoint, EliminateMapObjects, CombineTypedFilters) :+ Batch("LocalRelation", fixedPoint, ConvertToLocalRelation, PropagateEmptyRelation) :+ // The following batch should be executed after batch "Join Reorder" and "LocalRelation". Batch("Check Cartesian Products", Once, CheckCartesianProducts) :+ Batch("RewriteSubquery", Once, RewritePredicateSubquery, ColumnPruning, CollapseProject, RemoveRedundantProject) }
如上这便是在优化这步的所有的规则和策略例如消除子查询别名,表达式替换、算子下推、常量折叠等优化规则,经过这一步之后,就进入物理计划阶段了。
以上是关于Spark SQL OptimizedPlan的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark Databricks 中注册 SQL 函数