Flink sql的实现
Posted 张包峰
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink sql的实现相关的知识,希望对你有一定的参考价值。
SQL Impl in Flink
跟了下Flink Table里sql的实现,flink sql的实现比较简单,一句话概述就是:借助Apache Calcite做了sql解析、逻辑树生成的过程,得到Calcite的RelRoot类,生成flink的Table,Table里的执行计划会转化成DataSet的计算,经历物理执行计划优化等步骤。
类比Spark SQL,Calcite代替了大部分Spark SQL Catalyst的工作(Catalyst还包括了Tree/Node的定义,这部分代码Flink也’借鉴’来了)。两者最终是计算一颗逻辑执行计划树,翻译成各自的DataSet(Spark 2.0引入Dataset并统一DataFrame,隐藏RDD到引擎内部这层,类似于执行层内部的物理执行节点)。
Calcite Usage
最新Flink代码里,在flink-table
工程里,使用1.7版本的calcite-core
。
大致的执行过程如下:
- TableEnvironment.
sql()
为调用入口 - 类似Calcite的PlannerImpl,flink实现了个FlinkPlannerImpl,执行
parse(sql)
,validate(sqlNode)
,rel(sqlNode)
操作 - 生成Table
override def sql(query: String): Table =
val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner)
// parse the sql query
val parsed = planner.parse(query)
// validate the sql query
val validated = planner.validate(parsed)
// transform to a relational tree
val relational = planner.rel(validated)
new Table(this, LogicalRelNode(relational.rel))
LogicalRelNode是flink执行计算树里的叶子节点。其他节点的实现类最终都会转化成Calcite的RelBuilder生成一个可被Calcite继续执行计划优化的plan,逻辑在TableEnv的translate(table)方法里。
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] =
val relNode = table.getRelNode
// decorrelate
val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
// optimize the logical Flink plan
val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
val dataSetPlan = try
optProgram.run(getPlanner, decorPlan, flinkOutputProps)
catch
// ...
dataSetPlan match
case node: DataSetRel =>
node.translateToPlan(
this,
Some(tpe.asInstanceOf[TypeInformation[Any]])
).asInstanceOf[DataSet[A]]
case _ => ???
在Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)里,flink根据Calcite的接口定义了几个将最终物理计划转化为DataSet的Rule:
// translate to Flink DataSet nodes
DataSetAggregateRule.INSTANCE,
DataSetCalcRule.INSTANCE,
DataSetJoinRule.INSTANCE,
DataSetScanRule.INSTANCE,
DataSetUnionRule.INSTANCE,
DataSetSortRule.INSTANCE,
DataSetValuesRule.INSTANCE,
BatchTableSourceScanRule.INSTANCE
每条规则会对应生成一个物理节点,org.apache.flink.api.table.plan.nodes.dataset
package下。节点内,根据Calcite生成的sql的执行步骤,会进行codegen出DataSet的执行Function代码,在org.apache.flink.api.table.runtime
package下,目前生成三种ds操作: FlatMapRunner, FlatJoinRunner, 和MapRunner.
codegen部分与Spark SQL的结构相类似。
Calcite在Flink中的使用也比较基本,单测sql
package下的case就可以走通上面的调用过程。
整体Flink sql上的功能和实现要比Spark SQL简单很多。并可能存在许多借鉴之处。
以上是关于Flink sql的实现的主要内容,如果未能解决你的问题,请参考以下文章
Flink学习 Flink Table & SQL 实现wordcount Java版本
Flink实战系列Flink SQL 如何实现 count window 功能?