Flink sql的实现

Posted 张包峰

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink sql的实现相关的知识,希望对你有一定的参考价值。

SQL Impl in Flink

跟了下Flink Table里sql的实现,flink sql的实现比较简单,一句话概述就是:借助Apache Calcite做了sql解析、逻辑树生成的过程,得到Calcite的RelRoot类,生成flink的TableTable里的执行计划会转化成DataSet的计算,经历物理执行计划优化等步骤。

类比Spark SQL,Calcite代替了大部分Spark SQL Catalyst的工作(Catalyst还包括了Tree/Node的定义,这部分代码Flink也’借鉴’来了)。两者最终是计算一颗逻辑执行计划树,翻译成各自的DataSet(Spark 2.0引入Dataset并统一DataFrame,隐藏RDD到引擎内部这层,类似于执行层内部的物理执行节点)。

Calcite Usage

最新Flink代码里,在flink-table工程里,使用1.7版本的calcite-core

大致的执行过程如下:

  1. TableEnvironment.sql()为调用入口
  2. 类似Calcite的PlannerImpl,flink实现了个FlinkPlannerImpl,执行parse(sql)validate(sqlNode)rel(sqlNode)操作
  3. 生成Table
  override def sql(query: String): Table = 

    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner)
    // parse the sql query
    val parsed = planner.parse(query)
    // validate the sql query
    val validated = planner.validate(parsed)
    // transform to a relational tree
    val relational = planner.rel(validated)

    new Table(this, LogicalRelNode(relational.rel))
  

LogicalRelNode是flink执行计算树里的叶子节点。其他节点的实现类最终都会转化成Calcite的RelBuilder生成一个可被Calcite继续执行计划优化的plan,逻辑在TableEnv的translate(table)方法里。

 protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = 

    val relNode = table.getRelNode

    // decorrelate
    val decorPlan = RelDecorrelator.decorrelateQuery(relNode)

    // optimize the logical Flink plan
    val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
    val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()

    val dataSetPlan = try 
      optProgram.run(getPlanner, decorPlan, flinkOutputProps)
    
    catch 
      // ...
    

    dataSetPlan match 
      case node: DataSetRel =>
        node.translateToPlan(
          this,
          Some(tpe.asInstanceOf[TypeInformation[Any]])
        ).asInstanceOf[DataSet[A]]
      case _ => ???
    
  

Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)里,flink根据Calcite的接口定义了几个将最终物理计划转化为DataSet的Rule:

    // translate to Flink DataSet nodes
    DataSetAggregateRule.INSTANCE,
    DataSetCalcRule.INSTANCE,
    DataSetJoinRule.INSTANCE,
    DataSetScanRule.INSTANCE,
    DataSetUnionRule.INSTANCE,
    DataSetSortRule.INSTANCE,
    DataSetValuesRule.INSTANCE,
    BatchTableSourceScanRule.INSTANCE

每条规则会对应生成一个物理节点,org.apache.flink.api.table.plan.nodes.dataset package下。节点内,根据Calcite生成的sql的执行步骤,会进行codegen出DataSet的执行Function代码,在org.apache.flink.api.table.runtime package下,目前生成三种ds操作: FlatMapRunnerFlatJoinRunner, 和MapRunner.

codegen部分与Spark SQL的结构相类似。

Calcite在Flink中的使用也比较基本,单测sql package下的case就可以走通上面的调用过程。

整体Flink sql上的功能和实现要比Spark SQL简单很多。并可能存在许多借鉴之处。

以上是关于Flink sql的实现的主要内容,如果未能解决你的问题,请参考以下文章

Flink学习 Flink Table & SQL 实现wordcount Java版本

Flink实战系列Flink SQL 如何实现 count window 功能?

Flink sql的实现

Flink sql的实现

flink1.12.1扩展flink-sql 支持写入到sqlserver

Flink实战系列Flink SQL 字符串类型的字段如何实现列转行?