Flink 流流关联( Interval Join)总结
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 流流关联( Interval Join)总结相关的知识,希望对你有一定的参考价值。
参考技术AFlink对于join的支持有多种支持,可参考 Flink Join类型 , 本文主要讨论Time interval join支持Table API的双流join,同时支持基于EventTime 和 ProcessingTime的的流流join。 Flink在TableApi中将流作为表使用,下文也不再区分流和表。
Flink对于interval join的支持从1.4版本开始,直到Flink1.6,经过几个版本的增强,形成了从开始的Table/Sql Api的支持,到后续DataStream Api的支持,从开始的inner join 到后来的left outer,right outer, full outerjoin的支持,算是完成了FLink对双流关联的支持,不同版本的功能支持如下:
从官方给出的Release Note来看,Flink1.4,Flink1.5中的双流join是指windowed join,但从官方给出的样例以及源码来看,此处的Windowed Join 应当指的就是interval join;鉴于Flink版本近期的变更较大,笔者不再在原有老版本中测试相关功能,下文的介绍基于Flink最新release版本1.8
在流与流的join中,与其他window join相比,window中的关联通常是两个流中对应的window中的消息可以发生关联, 不能跨window,Interval Join则没有window的概念,直接用时间戳作为关联的条件,更具表达力。由于流消息的无限性以及消息乱序的影响,本应关联上的消息可能进入处理系统的时间有较大差异,一条流中的消息,可能需要和另一条流的多条消息关联,因此流流关联时,通常需要类似如下关联条件:
其中lower bound,upperBound可设置为正值,负值,0
Interval join的实现基本逻辑比较简单,主要依靠 TimeBoundedStreamJoin 完成消息的关联,其核心逻辑主要包含消息的缓存,不同关联类型的处理,消息的清理,但实现起来并不简单,下面基于eventTime分别对以上进行分析:
由于Flink对于流关联的处理逻辑是对于两条流的消息分别处理,但两条流的处理方式是完全一致的,一下基于第一条流(左流)进行分析
假定左流中的消息l如 a,b,2019-07-22 00:00:00 ,左流的可容忍乱序时间OutOfOrder时间设置为10s,其中第三个字段为时间戳字段
FlinkFlink 源码阅读笔记(19)- Flink SQL 中流表 Join 的实现
1.概述
转载:Flink 源码阅读笔记(19)- Flink SQL 中流表 Join 的实现
在使用 SQL 进行数据分析的过程中,关联查询是经常要使用到的操作。在传统的 OLTP 和 OLAP 领域中,关联查询的数据集都是有界的,因此可以依赖于缓存有界的数据集进行查询。但是在 Streaming SQL 中,针对 Stream Join Stream 的情况,由于关联查询的两侧都是连续无界的数据流,传统数据表中 Join 操作的实现和优化方式可能并不完全适用。在这篇文章中,我们将介绍双流 Join 面临的挑战,并对 Flink SQL 中双流 Join 的具体实现机制进行分析。
2.双流 Join 的挑战
在传统的数据库或批处理场景中 ,关联查询的数据集都是有限的
,因此可以依赖于缓存有界的数据集,使用诸如 Nested-Loop Join,Sort-Merged Join 或者 Hash Join
等方法进行匹配查询。但是在 Streaming SQL 中,两个数据流的关联查询主要面临如下两个问题:一方面,数据流是无限的
,缓存数据对 long-running 的任务而言会带来较高的存储和查询压力;另一方面,两侧数据流中消息到达的时间存在不一致
的情况,可能造成关联结果的缺失。
对于上述的第一个问题,为了保证关联结果的正确性,需要将数据流中所有历史数据缓存下来。随着两个数据流中数据源源不断到来,缓存历史数据带来的开销越来越大,且每一条新到达的消息都会激发对另一侧历史数据的查询。为了解决该问题,一种方法是通过时间窗口将关联的数据范围限制在特定的时间范围内,即 Window Join(关于时间窗口可以参考之前的文章);另一种方法是,在存储开销和关联准确度方面做一下权衡,在缓存的历史数据上增加生存时间的限制,这样可以避免缓存的数据无限增长,但相应地可能会造成准确度的降低。
上述第二个问题,主要针对的是外连接
的情况。由于两侧数据到达时间的不确定性,对于特定消息,可能出现 t1 时刻不存在匹配记录,而 t2 (t2 > t1) 时刻存在匹配记录的情况。对于外连接,要求在不存在关联结果的情况下返回 NULL 值。因此为了保证关联结果的正确性,一种方式是通过时间窗口限制关联的数据范围,但这样就要求在窗口结束时才输出结果,会导致输出延迟
;另一种方式是采取“撤销-更正”的方式,先输出 NULL 值,在后续关联记录到达时再撤销已输出的记录,修正为关联的正确结果,其缺点是会造成输出记录数的放大
。
从上述的分析可以看出,时间窗口在关联查询中通过限制关联数据的范围
,可以部分程度上解决 Streaming Join 面临的问题,其基本思路是将无限的数据流切分为有限的时间窗口
。但时间窗口关联并不适合所有的情况,很多时候两个数据流的关联查询并不能限定在特定的时间窗口内;此外,时间窗口关联还存在输出延迟的情况
。
本文的后续部分将对 Flink SQL 中普通双流 Join 的实现机制加以介绍,Window Join 的实现机制将在后续的文章中进行分析。
3.双流 Join 的实现机制
一条 Join 语句的转换
首先,我们以一条简单的 Join 语句为例,跟踪一条 Join 语句的变换过程。
-- table A('a1, 'a2, 'a3)
-- table B('b1, 'b2, 'b3)
SELECT a1, b1 FROM A JOIN B ON a1 = b1 and a2 > b2
上述的 SQL 语句在经过解析后,被转换为如下的逻辑计划:
LogicalProject(a1=[$0], b1=[$3])
+- LogicalJoin(condition=[AND(=($0, $3), >($1, $4))], joinType=[inner])
:- LogicalTableScan(table=[[A, source: [TestTableSource(a1, a2, a3)]]])
+- LogicalTableScan(table=[[B, source: [TestTableSource(b1, b2, b3)]]])
这份逻辑计划首先被转换为 Flink SQL 内部的 RelNode,即:
FlinkLogicalCalc(select=[a1, b1])
+- FlinkLogicalJoin(condition=[AND(=($0, $2), >($1, $3))], joinType=[inner])
:- FlinkLogicalCalc(select=[a1, a2])
: +- FlinkLogicalTableSourceScan(table=[[A, source: [TestTableSource(a1, a2, a3)]]], fields=[a1, a2, a3])
+- FlinkLogicalCalc(select=[b1, b2])
+- FlinkLogicalTableSourceScan(table=[[B, source: [TestTableSource(b1, b2, b3)]]], fields=[b1, b2, b3])
此后,经过一系列优化规则被优化为最终的执行计划,如下:
Calc(select=[a1, b1])
+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), >(a2, b2))], select=[a1, a2, b1, b2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a1]])
: +- Calc(select=[a1, a2])
: +- TableSourceScan(table=[[A, source: [TestTableSource(a1, a2, a3)]]], fields=[a1, a2, a3])
+- Exchange(distribution=[hash[b1]])
+- Calc(select=[b1, b2])
+- TableSourceScan(table=[[B, source: [TestTableSource(b1, b2, b3)]]], fields=[b1, b2, b3])
至此,逻辑计划的优化阶段结束,进入物理计划生成的阶段。
Flink SQL 会为 StreamExecJoin
操作生成一个 TwoInputTransformation
变换,内部算子为 StreamingJoinOperator
,用于在两个数据流中匹配关联记录;为 StreamExecExchange
操作生成一个 PartitionTransformation
变换,用来确定上游算子输出的记录转发到下游算子的分区。
4.两个重要的变换规则
在逻辑计划优化的过程中,有两个重要的规则需要关注,分别是 StreamExecJoinRule
和 FlinkExpandConversionRule
。
顾名思义,StreamExecJoinRule
主要用于将 FlinkLogicalJoin
转换为 StreamExecJoin
。但是这个变换是有条件限制的,即 FlinkLogicalJoin
的关联条件中不包含时间窗口
。首先来看一下这个规则的匹配条件:
class StreamExecJoinRule
extends RelOptRule(
operand(classOf[FlinkLogicalJoin],
operand(classOf[FlinkLogicalRel], any()),
operand(classOf[FlinkLogicalRel], any())),
"StreamExecJoinRule")
override def matches(call: RelOptRuleCall): Boolean =
val join: FlinkLogicalJoin = call.rel(0)
//关联结果是否需要从右表投射数据,SEMI JOIN 和 ANTI JOIN 不需要选择右表的数据
if (!join.getJoinType.projectsRight)
// SEMI/ANTI JOIN 总是被转换为 StreamExecJoin
return true
val left: FlinkLogicalRel = call.rel(1).asInstanceOf[FlinkLogicalRel]
val right: FlinkLogicalRel = call.rel(2).asInstanceOf[FlinkLogicalRel]
val tableConfig = call.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig
val joinRowType = join.getRowType
//左表不支持 Temporal Table
if (left.isInstanceOf[FlinkLogicalSnapshot])
throw new TableException(
"Temporal table join only support apply FOR SYSTEM_TIME AS OF on the right table.")
//不支持 Temporal Table JOIN
if (right.isInstanceOf[FlinkLogicalSnapshot] ||
TemporalJoinUtil.containsTemporalJoinCondition(join.getCondition))
return false
//从关联条件中提取 1)时间窗口边界 2)其它条件
val (windowBounds, remainingPreds) = WindowJoinUtil.extractWindowBoundsFromPredicate(
join.getCondition,
join.getLeft.getRowType.getFieldCount,
joinRowType,
join.getCluster.getRexBuilder,
tableConfig)
//存在窗口,则不适用于该规则
if (windowBounds.isDefined)
return false
//普通关联条件不能访问时间属性
// remaining predicate must not access time attributes
val remainingPredsAccessTime = remainingPreds.isDefined &&
WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, joinRowType)
//RowTime 属性不能出现在普通 join 的关联条件中
//@see https://stackoverflow.com/questions/57181771/flink-rowtime-attributes-must-not-be-in-the-input-rows-of-a-regular-join
val rowTimeAttrInOutput = joinRowType.getFieldList
.exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
if (rowTimeAttrInOutput)
throw new TableException(
"Rowtime attributes must not be in the input rows of a regular join. " +
"As a workaround you can cast the time attributes of input tables to TIMESTAMP before.")
// joins require an equality condition
// or a conjunctive predicate with at least one equality condition
// and disable outer joins with non-equality predicates(see FLINK-5520)
// And do not accept a FlinkLogicalTemporalTableSourceScan as right input
!remainingPredsAccessTime
其基本逻辑就是,在普通的双流 Join 中不支持 Temporal Table,不支持时间窗口,不支持访问时间属性
。这里需要注意的一点是,在普通的双流 Join 中,Flink 没法保证关联结果按照时间先后顺序提交,会破坏时间属性的顺序
,因此在普通的双流 Join 中关联条件不支持时间属性。
StreamExecJoinRule
会将 FlinkLogicalJoin
转换为 StreamexecJoin
,但相应地,需要先对 FlinkLogicalJoin
的两个输入进行变换。在这里,会将 FlinkRelDistribution
这个 trait 下推到输入算子中。
FlinkRelDistribution
用于确定上游算子结果转发到下游算子的分区信息
。例如,如果关联条件中存在等值关联条件,那么就会按照对应的关联键进行哈希分区,确保相同键的记录被转发到相同的 Task 中,即 FlinkRelDistribution.hash
;而如果关联条件中不存在等值条件,那么所有的记录只能被转发到同一个 Task 中,即 FlinkRelDistribution.SINGLETON
。
class StreamExecJoinRule
override def onMatch(call: RelOptRuleCall): Unit =
val join: FlinkLogicalJoin = call.rel(0)
val left = join.getLeft
val right = join.getRight
//根据是否存在等值关联条件确定 FlinkRelDistribution
def toHashTraitByColumns(
columns: util.Collection[_ <: Number],
inputTraitSets: RelTraitSet): RelTraitSet =
val distribution = if (columns.isEmpty)
FlinkRelDistribution.SINGLETON
else
FlinkRelDistribution.hash(columns)
inputTraitSets
.replace(FlinkConventions.STREAM_PHYSICAL)
.replace(distribution)
val joinInfo = join.analyzeCondition()
val (leftRequiredTrait, rightRequiredTrait) = (
toHashTraitByColumns(joinInfo.leftKeys, left.getTraitSet),
toHashTraitByColumns(joinInfo.rightKeys, right.getTraitSet))
val providedTraitSet = join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
//变换输入
val newLeft: RelNode = RelOptRule.convert(left, leftRequiredTrait)
val newRight: RelNode = RelOptRule.convert(right, rightRequiredTrait)
//生成 StreamExecJoin
val newJoin = new StreamExecJoin(
join.getCluster,
providedTraitSet,
newLeft,
newRight,
join.getCondition,
join.getJoinType)
call.transformTo(newJoin)
对 FlinkRelDistribution
的匹配变换规则在 FlinkExpandConversionRule
中。FlinkExpandConversionRule
的作用是处理 RelDistribution
和 RelCollation
这两种 trait,其中 RelDistribution
描述数据的物理分布情况,RelCollation
描述排序情况(通常在 Batch 模式下应用在 ORDER BY 语句中)。
在 FlinkExpandConversionRule
中会为目标 trait 包含 FlinkRelDistribution 的变换生成一个 StreamExecExchange
:
class FlinkExpandConversionRule(flinkConvention: Convention)
extends RelOptRule(
operand(classOf[AbstractConverter],
operand(classOf[RelNode], any)),
"FlinkExpandConversionRule")
override def matches(call: RelOptRuleCall): Boolean =
// from trait 和 to trait 不一致
val toTraitSet = call.rel(0).asInstanceOf[AbstractConverter].getTraitSet
val fromTraitSet = call.rel(1).asInstanceOf[RelNode].getTraitSet
toTraitSet.contains(flinkConvention) &&
fromTraitSet.contains(flinkConvention) &&
!fromTraitSet.satisfies(toTraitSet)
override def onMatch(call: RelOptRuleCall): Unit =
val converter: AbstractConverter = call.rel(0)
val child: RelNode = call.rel(1)
val toTraitSet = converter.getTraitSet
// try to satisfy required trait by itself.
satisfyTraitsBySelf(child, toTraitSet, call)
// try to push down required traits to children.
satisfyTraitsByInput(child, toTraitSet, call)
private def satisfyTraitsBySelf(
node: RelNode,
requiredTraits: RelTraitSet,
call: RelOptRuleCall): Unit =
var transformedNode = node
val definedTraitDefs = call.getPlanner.getRelTraitDefs
// 处理 FlinkRelDistribution
if (definedTraitDefs.contains(FlinkRelDistributionTraitDef.INSTANCE))
val toDistribution = requiredTraits.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
transformedNode = satisfyDistribution(flinkConvention, transformedNode, toDistribution)
if (definedTraitDefs.contains(RelCollationTraitDef.INSTANCE))
val toCollation = requiredTraits.getTrait(RelCollationTraitDef.INSTANCE)
transformedNode = satisfyCollation(flinkConvention, transformedNode, toCollation)
checkSatisfyRequiredTrait(transformedNode, requiredTraits)
call.transformTo(transformedNode)
object FlinkExpandConversionRule
def satisfyDistribution(
flinkConvention: Convention,
node: RelNode,
requiredDistribution: FlinkRelDistribution): RelNode =
val fromTraitSet = node.getTraitSet
val fromDistribution = fromTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
if (!fromDistribution.satisfies(requiredDistribution))
requiredDistribution.getType match
case SINGLETON | HASH_DISTRIBUTED | RANGE_DISTRIBUTED |
BROADCAST_DISTRIBUTED | RANDOM_DISTRIBUTED =>
flinkConvention match
case FlinkConventions.BATCH_PHYSICAL =>
// replace collation with empty since distribution destroy collation
......
new BatchExecExchange(node.getCluster, traitSet, node, requiredDistribution)
case FlinkConventions.STREAM_PHYSICAL =>
val updateAsRetraction = fromTraitSet.getTrait(UpdateAsRetractionTraitDef.INSTANCE)
val accMode = fromTraitSet.getTrait(AccModeTraitDef.INSTANCE)
// replace collation with empty since distribution destroy collation
val traitSet = fromTraitSet
.replace(requiredDistribution)
.replace(flinkConvention)
.replace(RelCollations.EMPTY)
.replace(updateAsRetraction)
.replace(accMode)
// 生成 StreamExecExchange
new StreamExecExchange(node.getCluster, traitSet, node, requiredDistribution)
case _ => throw new TableException(s"Unsupported convention: $flinkConvention")
case _ => throw new TableException(s"Unsupported type: $requiredDistribution.getType")
else
node
5.物理执行计划
在得到最终的逻辑执行计划后,需要将其转换为物理执行计划,即生成 Flink 内部的 Transformation 算子。
首先,StreamExecJoin
的输入是两个 StreamExecExchange
节点,StreamExecExchange
会生成 PartitionTransformation
算子,用来决定上游数据到下游的分布情况。根据 RelDistribution.Type
的不同,PartitionTransformation
的 StreamPartitioner
会选择使用 GlobalPartitioner
(对应 RelDistribution.Type.SINGLETON
) 或是 KeyGroupStreamPartitioner
(对应 RelDistribution.Type.HASH_DISTRIBUTED
)。
class StreamExecExchange
//生成物理执行计划
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[BaseRow] =
val inputTransform = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[BaseRow]]
val inputTypeInfo = inputTransform.getOutputType.asInstanceOf[BaseRowTypeInfo]
val outputTypeInfo = BaseRowTypeInfo.of(
FlinkTypeFactory.toLogicalRowType(getRowType))
relDistribution.getType match
// 如果分布是 SINGLETON(不存在等值关联条件),所有记录被转发至同一个分区
case RelDistribution.Type.SINGLETON =>
val partitioner = new GlobalPartitioner[BaseRow]
val transformation = new PartitionTransformation(
inputTransform,
partitioner.asInstanceOf[StreamPartitioner[BaseRow]])
transformation.setOutputType(outputTypeInfo)
transformation.setParallelism(1)
transformation
case RelDistribution.Type.HASH_DISTRIBUTED =>
val selector = KeySelectorUtil.getBaseRowSelector(
relDistribution.getKeys.map(_.toInt).toArray, inputTypeInfo)
// 如果分布是 HASH(存在等值关联条件),按 HASH 分区
val partitioner = new KeyGroupStreamPartitioner(selector,
DEFAULT_LOWER_BOUND_MAX_PARALLELISM)
val transformation = new PartitionTransformation(
inputTransform,
partitioner.asInstanceOf[StreamPartitioner[BaseRow]])
transformation.setOutputType(outputTypeInfo)
transformation.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT)
transformation
case _以上是关于Flink 流流关联( Interval Join)总结的主要内容,如果未能解决你的问题,请参考以下文章
Flink Interval Join,Temporal Join,Lookup Join区别
Flink Interval Join,Temporal Join,Lookup Join区别