FlinkFlink 源码阅读笔记(19)- Flink SQL 中流表 Join 的实现

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 源码阅读笔记(19)- Flink SQL 中流表 Join 的实现相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink 源码阅读笔记(19)- Flink SQL 中流表 Join 的实现

在使用 SQL 进行数据分析的过程中,关联查询是经常要使用到的操作。在传统的 OLTP 和 OLAP 领域中,关联查询的数据集都是有界的,因此可以依赖于缓存有界的数据集进行查询。但是在 Streaming SQL 中,针对 Stream Join Stream 的情况,由于关联查询的两侧都是连续无界的数据流,传统数据表中 Join 操作的实现和优化方式可能并不完全适用。在这篇文章中,我们将介绍双流 Join 面临的挑战,并对 Flink SQL 中双流 Join 的具体实现机制进行分析。

2.双流 Join 的挑战

在传统的数据库或批处理场景中 ,关联查询的数据集都是有限的,因此可以依赖于缓存有界的数据集,使用诸如 Nested-Loop Join,Sort-Merged Join 或者 Hash Join 等方法进行匹配查询。但是在 Streaming SQL 中,两个数据流的关联查询主要面临如下两个问题:一方面,数据流是无限的,缓存数据对 long-running 的任务而言会带来较高的存储和查询压力;另一方面,两侧数据流中消息到达的时间存在不一致的情况,可能造成关联结果的缺失。

对于上述的第一个问题,为了保证关联结果的正确性,需要将数据流中所有历史数据缓存下来。随着两个数据流中数据源源不断到来,缓存历史数据带来的开销越来越大,且每一条新到达的消息都会激发对另一侧历史数据的查询。为了解决该问题,一种方法是通过时间窗口将关联的数据范围限制在特定的时间范围内,即 Window Join(关于时间窗口可以参考之前的文章);另一种方法是,在存储开销和关联准确度方面做一下权衡,在缓存的历史数据上增加生存时间的限制,这样可以避免缓存的数据无限增长,但相应地可能会造成准确度的降低。


上述第二个问题,主要针对的是外连接的情况。由于两侧数据到达时间的不确定性,对于特定消息,可能出现 t1 时刻不存在匹配记录,而 t2 (t2 > t1) 时刻存在匹配记录的情况。对于外连接,要求在不存在关联结果的情况下返回 NULL 值。因此为了保证关联结果的正确性,一种方式是通过时间窗口限制关联的数据范围,但这样就要求在窗口结束时才输出结果,会导致输出延迟;另一种方式是采取“撤销-更正”的方式,先输出 NULL 值,在后续关联记录到达时再撤销已输出的记录,修正为关联的正确结果,其缺点是会造成输出记录数的放大

从上述的分析可以看出,时间窗口在关联查询中通过限制关联数据的范围,可以部分程度上解决 Streaming Join 面临的问题,其基本思路是将无限的数据流切分为有限的时间窗口。但时间窗口关联并不适合所有的情况,很多时候两个数据流的关联查询并不能限定在特定的时间窗口内;此外,时间窗口关联还存在输出延迟的情况

本文的后续部分将对 Flink SQL 中普通双流 Join 的实现机制加以介绍,Window Join 的实现机制将在后续的文章中进行分析。

3.双流 Join 的实现机制

一条 Join 语句的转换
首先,我们以一条简单的 Join 语句为例,跟踪一条 Join 语句的变换过程。

-- table A('a1, 'a2, 'a3)
-- table B('b1, 'b2, 'b3)

SELECT a1, b1 FROM A JOIN B ON a1 = b1 and a2 > b2

上述的 SQL 语句在经过解析后,被转换为如下的逻辑计划:

LogicalProject(a1=[$0], b1=[$3])
+- LogicalJoin(condition=[AND(=($0, $3), >($1, $4))], joinType=[inner])
   :- LogicalTableScan(table=[[A, source: [TestTableSource(a1, a2, a3)]]])
   +- LogicalTableScan(table=[[B, source: [TestTableSource(b1, b2, b3)]]])

这份逻辑计划首先被转换为 Flink SQL 内部的 RelNode,即:

FlinkLogicalCalc(select=[a1, b1])
+- FlinkLogicalJoin(condition=[AND(=($0, $2), >($1, $3))], joinType=[inner])
   :- FlinkLogicalCalc(select=[a1, a2])
   :  +- FlinkLogicalTableSourceScan(table=[[A, source: [TestTableSource(a1, a2, a3)]]], fields=[a1, a2, a3])
   +- FlinkLogicalCalc(select=[b1, b2])
      +- FlinkLogicalTableSourceScan(table=[[B, source: [TestTableSource(b1, b2, b3)]]], fields=[b1, b2, b3])

此后,经过一系列优化规则被优化为最终的执行计划,如下:

Calc(select=[a1, b1])
+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), >(a2, b2))], select=[a1, a2, b1, b2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[hash[a1]])
   :  +- Calc(select=[a1, a2])
   :     +- TableSourceScan(table=[[A, source: [TestTableSource(a1, a2, a3)]]], fields=[a1, a2, a3])
   +- Exchange(distribution=[hash[b1]])
      +- Calc(select=[b1, b2])
         +- TableSourceScan(table=[[B, source: [TestTableSource(b1, b2, b3)]]], fields=[b1, b2, b3])

至此,逻辑计划的优化阶段结束,进入物理计划生成的阶段。

Flink SQL 会为 StreamExecJoin 操作生成一个 TwoInputTransformation 变换,内部算子为 StreamingJoinOperator,用于在两个数据流中匹配关联记录;为 StreamExecExchange 操作生成一个 PartitionTransformation 变换,用来确定上游算子输出的记录转发到下游算子的分区。

4.两个重要的变换规则

在逻辑计划优化的过程中,有两个重要的规则需要关注,分别是 StreamExecJoinRuleFlinkExpandConversionRule

顾名思义,StreamExecJoinRule 主要用于将 FlinkLogicalJoin 转换为 StreamExecJoin。但是这个变换是有条件限制的,即 FlinkLogicalJoin 的关联条件中不包含时间窗口。首先来看一下这个规则的匹配条件:

class StreamExecJoinRule
  extends RelOptRule(
    operand(classOf[FlinkLogicalJoin],
      operand(classOf[FlinkLogicalRel], any()),
      operand(classOf[FlinkLogicalRel], any())),
    "StreamExecJoinRule") 

  override def matches(call: RelOptRuleCall): Boolean = 
    val join: FlinkLogicalJoin = call.rel(0)
    //关联结果是否需要从右表投射数据,SEMI JOIN 和 ANTI JOIN 不需要选择右表的数据
    if (!join.getJoinType.projectsRight) 
      // SEMI/ANTI JOIN 总是被转换为 StreamExecJoin
      return true
    
    val left: FlinkLogicalRel = call.rel(1).asInstanceOf[FlinkLogicalRel]
    val right: FlinkLogicalRel = call.rel(2).asInstanceOf[FlinkLogicalRel]
    val tableConfig = call.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig
    val joinRowType = join.getRowType

    //左表不支持 Temporal Table
    if (left.isInstanceOf[FlinkLogicalSnapshot]) 
      throw new TableException(
        "Temporal table join only support apply FOR SYSTEM_TIME AS OF on the right table.")
    

    //不支持 Temporal Table JOIN
    if (right.isInstanceOf[FlinkLogicalSnapshot] ||
      TemporalJoinUtil.containsTemporalJoinCondition(join.getCondition)) 
      return false
    

    //从关联条件中提取 1)时间窗口边界 2)其它条件
    val (windowBounds, remainingPreds) = WindowJoinUtil.extractWindowBoundsFromPredicate(
      join.getCondition,
      join.getLeft.getRowType.getFieldCount,
      joinRowType,
      join.getCluster.getRexBuilder,
      tableConfig)

    //存在窗口,则不适用于该规则
    if (windowBounds.isDefined) 
      return false
    

    //普通关联条件不能访问时间属性
    // remaining predicate must not access time attributes
    val remainingPredsAccessTime = remainingPreds.isDefined &&
      WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, joinRowType)

    //RowTime 属性不能出现在普通 join 的关联条件中
    //@see https://stackoverflow.com/questions/57181771/flink-rowtime-attributes-must-not-be-in-the-input-rows-of-a-regular-join
    val rowTimeAttrInOutput = joinRowType.getFieldList
      .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    if (rowTimeAttrInOutput) 
      throw new TableException(
        "Rowtime attributes must not be in the input rows of a regular join. " +
          "As a workaround you can cast the time attributes of input tables to TIMESTAMP before.")
    

    // joins require an equality condition
    // or a conjunctive predicate with at least one equality condition
    // and disable outer joins with non-equality predicates(see FLINK-5520)
    // And do not accept a FlinkLogicalTemporalTableSourceScan as right input
    !remainingPredsAccessTime
  

其基本逻辑就是,在普通的双流 Join 中不支持 Temporal Table,不支持时间窗口,不支持访问时间属性。这里需要注意的一点是,在普通的双流 Join 中,Flink 没法保证关联结果按照时间先后顺序提交,会破坏时间属性的顺序,因此在普通的双流 Join 中关联条件不支持时间属性。

StreamExecJoinRule 会将 FlinkLogicalJoin 转换为 StreamexecJoin,但相应地,需要先对 FlinkLogicalJoin 的两个输入进行变换。在这里,会将 FlinkRelDistribution 这个 trait 下推到输入算子中。

FlinkRelDistribution 用于确定上游算子结果转发到下游算子的分区信息。例如,如果关联条件中存在等值关联条件,那么就会按照对应的关联键进行哈希分区,确保相同键的记录被转发到相同的 Task 中,即 FlinkRelDistribution.hash;而如果关联条件中不存在等值条件,那么所有的记录只能被转发到同一个 Task 中,即 FlinkRelDistribution.SINGLETON

class StreamExecJoinRule
  override def onMatch(call: RelOptRuleCall): Unit = 
    val join: FlinkLogicalJoin = call.rel(0)
    val left = join.getLeft
    val right = join.getRight

    //根据是否存在等值关联条件确定 FlinkRelDistribution
    def toHashTraitByColumns(
        columns: util.Collection[_ <: Number],
        inputTraitSets: RelTraitSet): RelTraitSet = 
      val distribution = if (columns.isEmpty) 
        FlinkRelDistribution.SINGLETON
       else 
        FlinkRelDistribution.hash(columns)
      
      inputTraitSets
        .replace(FlinkConventions.STREAM_PHYSICAL)
        .replace(distribution)
    

    val joinInfo = join.analyzeCondition()
    val (leftRequiredTrait, rightRequiredTrait) = (
      toHashTraitByColumns(joinInfo.leftKeys, left.getTraitSet),
      toHashTraitByColumns(joinInfo.rightKeys, right.getTraitSet))

    val providedTraitSet = join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)

    //变换输入
    val newLeft: RelNode = RelOptRule.convert(left, leftRequiredTrait)
    val newRight: RelNode = RelOptRule.convert(right, rightRequiredTrait)

    //生成 StreamExecJoin
    val newJoin = new StreamExecJoin(
      join.getCluster,
      providedTraitSet,
      newLeft,
      newRight,
      join.getCondition,
      join.getJoinType)
    call.transformTo(newJoin)
  

FlinkRelDistribution 的匹配变换规则在 FlinkExpandConversionRule 中。FlinkExpandConversionRule 的作用是处理 RelDistributionRelCollation 这两种 trait,其中 RelDistribution 描述数据的物理分布情况,RelCollation 描述排序情况(通常在 Batch 模式下应用在 ORDER BY 语句中)。

FlinkExpandConversionRule 中会为目标 trait 包含 FlinkRelDistribution 的变换生成一个 StreamExecExchange:

class FlinkExpandConversionRule(flinkConvention: Convention)
  extends RelOptRule(
    operand(classOf[AbstractConverter],
      operand(classOf[RelNode], any)),
    "FlinkExpandConversionRule") 
  override def matches(call: RelOptRuleCall): Boolean = 
    // from trait 和 to trait 不一致
    val toTraitSet = call.rel(0).asInstanceOf[AbstractConverter].getTraitSet
    val fromTraitSet = call.rel(1).asInstanceOf[RelNode].getTraitSet
    toTraitSet.contains(flinkConvention) &&
      fromTraitSet.contains(flinkConvention) &&
      !fromTraitSet.satisfies(toTraitSet)
  

  override def onMatch(call: RelOptRuleCall): Unit = 
    val converter: AbstractConverter = call.rel(0)
    val child: RelNode = call.rel(1)
    val toTraitSet = converter.getTraitSet
    // try to satisfy required trait by itself.
    satisfyTraitsBySelf(child, toTraitSet, call)
    // try to push down required traits to children.
    satisfyTraitsByInput(child, toTraitSet, call)
  

  private def satisfyTraitsBySelf(
      node: RelNode,
      requiredTraits: RelTraitSet,
      call: RelOptRuleCall): Unit = 
    var transformedNode = node
    val definedTraitDefs = call.getPlanner.getRelTraitDefs
    // 处理 FlinkRelDistribution
    if (definedTraitDefs.contains(FlinkRelDistributionTraitDef.INSTANCE)) 
      val toDistribution = requiredTraits.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
      transformedNode = satisfyDistribution(flinkConvention, transformedNode, toDistribution)
    
    if (definedTraitDefs.contains(RelCollationTraitDef.INSTANCE)) 
      val toCollation = requiredTraits.getTrait(RelCollationTraitDef.INSTANCE)
      transformedNode = satisfyCollation(flinkConvention, transformedNode, toCollation)
    
    checkSatisfyRequiredTrait(transformedNode, requiredTraits)
    call.transformTo(transformedNode)
  


object FlinkExpandConversionRule 
  def satisfyDistribution(
      flinkConvention: Convention,
      node: RelNode,
      requiredDistribution: FlinkRelDistribution): RelNode = 
    val fromTraitSet = node.getTraitSet
    val fromDistribution = fromTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
    if (!fromDistribution.satisfies(requiredDistribution)) 
      requiredDistribution.getType match 
        case SINGLETON | HASH_DISTRIBUTED | RANGE_DISTRIBUTED |
             BROADCAST_DISTRIBUTED | RANDOM_DISTRIBUTED =>
          flinkConvention match 
            case FlinkConventions.BATCH_PHYSICAL =>
              // replace collation with empty since distribution destroy collation
              ......
              new BatchExecExchange(node.getCluster, traitSet, node, requiredDistribution)
            case FlinkConventions.STREAM_PHYSICAL =>
              val updateAsRetraction = fromTraitSet.getTrait(UpdateAsRetractionTraitDef.INSTANCE)
              val accMode = fromTraitSet.getTrait(AccModeTraitDef.INSTANCE)
              // replace collation with empty since distribution destroy collation
              val traitSet = fromTraitSet
                .replace(requiredDistribution)
                .replace(flinkConvention)
                .replace(RelCollations.EMPTY)
                .replace(updateAsRetraction)
                .replace(accMode)
              // 生成 StreamExecExchange
              new StreamExecExchange(node.getCluster, traitSet, node, requiredDistribution)
            case _ => throw new TableException(s"Unsupported convention: $flinkConvention")
          
        case _ => throw new TableException(s"Unsupported type: $requiredDistribution.getType")
      
     else 
      node
    
  


5.物理执行计划

在得到最终的逻辑执行计划后,需要将其转换为物理执行计划,即生成 Flink 内部的 Transformation 算子。

首先,StreamExecJoin 的输入是两个 StreamExecExchange 节点,StreamExecExchange 会生成 PartitionTransformation 算子,用来决定上游数据到下游的分布情况。根据 RelDistribution.Type 的不同,PartitionTransformationStreamPartitioner 会选择使用 GlobalPartitioner(对应 RelDistribution.Type.SINGLETON) 或是 KeyGroupStreamPartitioner(对应 RelDistribution.Type.HASH_DISTRIBUTED)。

class StreamExecExchange 

  //生成物理执行计划
  override protected def translateToPlanInternal(
      planner: StreamPlanner): Transformation[BaseRow] = 
    val inputTransform = getInputNodes.get(0).translateToPlan(planner)
      .asInstanceOf[Transformation[BaseRow]]
    val inputTypeInfo = inputTransform.getOutputType.asInstanceOf[BaseRowTypeInfo]
    val outputTypeInfo = BaseRowTypeInfo.of(
      FlinkTypeFactory.toLogicalRowType(getRowType))
    relDistribution.getType match 
        // 如果分布是 SINGLETON(不存在等值关联条件),所有记录被转发至同一个分区
      case RelDistribution.Type.SINGLETON =>
        val partitioner = new GlobalPartitioner[BaseRow]
        val transformation = new PartitionTransformation(
          inputTransform,
          partitioner.asInstanceOf[StreamPartitioner[BaseRow]])
        transformation.setOutputType(outputTypeInfo)
        transformation.setParallelism(1)
        transformation
      case RelDistribution.Type.HASH_DISTRIBUTED =>
        val selector = KeySelectorUtil.getBaseRowSelector(
          relDistribution.getKeys.map(_.toInt).toArray, inputTypeInfo)
        // 如果分布是 HASH(存在等值关联条件),按 HASH 分区
        val partitioner = new KeyGroupStreamPartitioner(selector,
          DEFAULT_LOWER_BOUND_MAX_PARALLELISM)
        val transformation = new PartitionTransformation(
          inputTransform,
          partitioner.asInstanceOf[StreamPartitioner[BaseRow]])
        transformation.setOutputType(outputTypeInfo)
        transformation.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT)
        transformation
      case _

以上是关于FlinkFlink 源码阅读笔记(19)- Flink SQL 中流表 Join 的实现的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFlink 源码阅读笔记(18)- Flink SQL 中的流和动态表

FlinkFlink 源码阅读笔记(16)- Flink SQL 的元数据管理

FlinkFlink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型

FlinkFlink 源码阅读笔记(15)- Flink SQL 整体执行框架

FlinkFlink 计算资源管理

FlinkFlink 使用代码如何主动触发 SavePoint