Apache Flink源码解析之stream-operator
Posted vinoYang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Flink源码解析之stream-operator相关的知识,希望对你有一定的参考价值。
前面我们谈论了Flink stream中的transformation。你可以将transformation
看成编写Flink程序并构建流式处理程序的必要组成部分(静态表现形式);而本篇我们将探讨transformation
在Flink运行时对应的动态表现形式——operator。他们之间的映射关系见下图:
具体的探讨可以查看前文:Flink中的一些核心概念
StreamOperator
所有operator
的最终基类,operator
的分类方式,按照输入流个数不同分为:
- 无输入:StreamSource
- 单个流输入:OneInputStreamOperator
- 两个流输入:TwoInputStreamOperator
跟生命周期有关的核心抽象方法:
- setup : 实例化
operator
- open :该方法会在任何元素被处理之前执行,它的实现通常包含了
operator
的初始化逻辑 - close :该方法在所有的元素都进入到
operator
被处理之后调用 - dispose :该方法在
operator
生命周期的最后阶段执行,主要用于回收资源
StreamOperator
及其实现类中还包含了一些状态恢复与保存相关的逻辑,但这些不是本文的主题,所有暂时不做探讨。
先来看一下整个package的类关系图:
我们整个剖析方式大致也按照以上operator
的分类方式以及类的层次结构来。
StreamSource
作为一个流处理DAG的起点,source operator
相比其他operator
无疑是特别的(从类的继承关系图也可以看出来)。
它需要接受SourceFunction
的实例。并且我们可以看到,它的chaining strategy
是HEAD
(它表示operator
不能有前置operator
,但可以作为其他operator
的前置operator
,下文会谈到)。
this.chainingStrategy = ChainingStrategy.HEAD;
StreamSource
的实现略显复杂,因为它涉及到我们前面文章谈SourceFunction
时谈到的SourceFunction.SourceContext
的实现。在这里提供了三个实现,分别对应我们之前谈到的Flink对事件时间的三个分类:
- NonTimestampContext:针对
ProcessingTime
,该SourceContext
将时间戳设置为-1,并且不发射watermark
- AutomaticWatermarkContext:针对
IngestionTime
,提供自动的watermark
发射机制的SourceContext
- ManualWatermarkContext:针对
EventTime
的人工发射watermark
的SourceContext
它们之间的对应关系也体现在其run
方法的实现中:
switch (timeCharacteristic) {
case EventTime:
ctx = new ManualWatermarkContext<>(this, lockingObject, collector);
break;
case IngestionTime:
ctx = new AutomaticWatermarkContext<>(this, lockingObject, collector,
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
break;
case ProcessingTime:
ctx = new NonTimestampContext<>(this, lockingObject, collector);
break;
default:
throw new Exception(String.valueOf(timeCharacteristic));
}
在run
方法内部会调用SourceFunction
的run
方法:
try {
userFunction.run(ctx);
// if we get here, then the user function either exited after being done (finite source)
// or the function was canceled or stopped. For the finite source case, we should emit
// a final watermark that indicates that we reached the end of event-time
if (!isCanceledOrStopped()) {
ctx.emitWatermark(Watermark.MAX_WATERMARK);
}
} finally {
// make sure that the context is closed in any case
ctx.close();
}
StreamSource
通过一个属性:canceledOrStopped
来控制sourceFunction
的停止。
整个StreamSource
的运行逻辑由run
来表述,通过cancel
来控制停止逻辑。
NonTimestampContext
NonTimestampContext
会忽略时间戳,因此它的实现里稍微特别一点的地方在下面的这两个方法:
public void collectWithTimestamp(T element, long timestamp) {
// ignore the timestamp
collect(element);
}
以及
public void emitWatermark(Watermark mark) {
owner.checkAsyncException();
// do nothing else
}
第一个方法忽略了时间戳,第二个方法不发送watermark
。
ManualWatermarkContext
无需特别说明
AutomaticWatermarkContext
该类是自动发送watermark
的实现,在构造器中接收参数watermarkInterval
来指定自动发送watermark
的时间间隔。具体的实现机制是,新建一个独立的发射线程,以指定的时间间隔发射:
this.scheduleExecutor = Executors.newScheduledThreadPool(1);
this.watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
final long currentTime = System.currentTimeMillis();
if (currentTime > nextWatermarkTime) {
// align the watermarks across all machines. this will ensure that we
// don‘t have watermarks that creep along at different intervals because
// the machine clocks are out of sync
final long watermarkTime = currentTime - (currentTime % watermarkInterval);
synchronized (lockingObjectParam) {
if (currentTime > nextWatermarkTime) {
outputParam.emitWatermark(new Watermark(watermarkTime));
nextWatermarkTime += watermarkInterval;
}
}
}
}
}, 0, watermarkInterval, TimeUnit.MILLISECONDS);
除了这种基于时间的以固定频率发射watermark
的机制,在collect方法被调用时,也会检查当前的时间戳,如果达到发送条件也会触发emit watermark
。
而因为该类实现的是自动发送,在构造器中实现一个定时发送机制,所以emitWatermark
方法也就不需要再实现发送逻辑(因为已不再需要用户程序调用emitWatermark
方法了),而该方法在该类中的主要任务是负责停止自动发送。停止自动发送的触发条件是收到最后一个元素的信号(将最后一个元素的时间戳设置为Long.MAX_VALUE
),emitWatermark
收到该标识后,再将其往下游传递并关闭定时发送线程。
OneInputStreamOperator
单一输入流的operator
接口,继承自StreamOperator
。提供了两个接口方法:
- processElement:处理到达该
operator
的一个元素 - processWatermark:处理一个
Watermark
TwoInputStreamOperator
支持两个流作为输入的operator
,同样继承自StreamOperator
。扩充了多个接口方法:
- processElement1 : 处理来自第一个输入的某个元素
- processElement2 : 处理来自第二个输入的某个元素
- processWatermark1 : 处理来自第一个输入的一个
Watermark
- processWatermark2 : 处理来自第二个输入的一个
Watermark
辅助实现类
Output
Collector
的扩展,增加了发射WaterMark
的功能。该接口主要供operator
用于发射元素或者WaterMark
。
- emitWatermark : 该发射
WaterMark
将广播给下游的所有operator
TimeCharacteristic
Flink在涉及到时间相关的处理时,将时间划分为三类。而时间类型的定义在Flink中就是用该枚举来表示:
- ProcessingTime
- IngestionTime
- EventTime
这三种时间类型之前我们曾多次提及,这里不再啰嗦
TimestampedCollector
Output
的包装器实现,它用于给元素设置时间戳
AbstractStreamOperator
该抽象类为实现一个具体的operator
提供基本的支持,Flink内置提供的operator
全部都直接或间接继承自AbstractStreamOperator
。
它内部包含了三大类的属性:
- 配置属性
- 运行时属性
- 键值对状态属性
大都数方法都是辅助方法,值得一提的是setup
方法。从这里我们可以看到所有operator
标识符的生成方式:
String operatorIdentifier = getClass().getSimpleName() + "_" + config.getVertexID() + "_" + runtimeContext.getIndexOfThisSubtask();
可以看到标识是由”_”间隔的三段拼接而成。三段分别是:类名,vertex id
,以及当前subtask
的索引。
然后基于此标识,创建了用于存储状态的stateBackend
:
stateBackend = container.createStateBackend(operatorIdentifier, keySerializer);
stateBackend
在 dispose
方法中会被关闭。
AbstractStreamOperator
并没有对open/close等生命周期方法提供具体的实现,这些方法的具体实现被后延至后面谈到的AbstractUdfStreamOperator
中。
AbstractUdfStreamOperator
该类主要针对operator
生命周期相关的方法(open/close/dispose)提供了模板实现。而这些实现都统一针对用户定义的Function
的实例(简称udf
)。
ChainingStrategy
该枚举定义了operator
的chain strategy
(链接策略)。当一个operator
链接到其前置operator
时,意味着它们将在同一个线程上执行。StreamOperator
的默认值是HEAD
,这意味着它将没有前置operator
,不过它有可能成为其他operator
的前置operator
。大部分StreamOperator
将该枚举以ALWAYS
覆盖,表示它们将链接到一个前置operator
。
它的三个枚举值:
- ALWAYS :上面已经提到过,它允许将当前
operator
链接到某前置operator
,这是提升性能的良好实践,它能够提升operator
的并行度 - NEVER :该策略不支持
operator
被链接到某前置operator
也不支持被作为其他operator
的前置operator
。 - HEAD :该策略表示
operator
没有前置operator
,不过可以作为其他operator
的chain header
内置的Operator实现
StreamCounter
元素累加器,没有什么特别的
StreamProject
这里需要解释一下,此处的project,并非通常所指的项目的意思,而是投射、投影
的意思。你可以将其类比于SQL中的SELECT
子句。因此他允许你选择你需要的fields
集合。这通过其构造器的一个字段索引数组来指定:
在processElement
方法中,它依次遍历所有需要的字段索引,将元素中需要的字段提取出来,放入一个用于输出的outTuple
,最后再将其发射出去:
public void processElement(StreamRecord<IN> element) throws Exception {
for (int i = 0; i < this.numFields; i++) {
outTuple.setField(((Tuple) element.getValue()).getField(fields[i]), i);
}
output.collect(element.replace(outTuple));
}
StreamFilter
filter operator
,处理逻辑很简单,根据自定义的FilterFunction
方法,对每个元素进行过滤,如果满足过滤条件,则将该元素emit
出去。
StreamMap
map operator
,根据传入的MapFunction
,对每个元素应用map
操作后将其发射出去。
StreamFlatMap
flatmap operator
接收FlatMapFunction
函数,有一些特别之处:在其open
方法中,它初始化了一个TimestampedCollector
,作为传递给FlatMapFunction
的collector
,该collector
是给那些特定的userFunction
使用的,并且用于给他们操作的元素设置时间戳。
StreamGroupedFold
分组的fold operator
,fold
函数的执行依赖于一个初始化值initialValue
。因此这里涉及到状态保存。并且状态是跟具体的分区关联的。因此,在open
方法的实现中,需要获得跟分区关联的ValueState
:
ValueStateDescriptor<OUT> stateId = new ValueStateDescriptor<>(STATE_NAME, outTypeSerializer, null);
values = getPartitionedState(stateId);
processElement
方法的实现,涉及到一系列的操作:从ValueState
中获取数据,作为“新”的初始值跟当前元素一起进行fold
函数运算,获得结果后更新ValueState
,然后将获得的结果emit
出去。
StreamGroupedReduce
按分组进行reduce
操作的operator
.
基于特定的状态名称:
private static final String STATE_NAME = "_op_state";
构建状态id
ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer, null);
然后再获取状态值:
values = getPartitionedState(stateId);
以上两个动作在open
方法中实现
在processElement
方法中,分为两种情况:
- 如果之前已存在状态值,那么拿当前值跟之前的状态值做
reduce
并获得结果,将结果再次更新到最新状态并emit
出去 - 如果之前不存在状态值,那么直接将当前值更新到状态中,并将当前值
emit
出去
StreamSink
sink operator
,通常是流处理的最后一个operator
。它接收SinkFunction
的实例。在processElement
中依次调用其invoke
方法。
小结
本文主要探讨了stream transformation
的运行时形式operator
的大致实现。
微信扫码关注公众号:Apache_Flink
QQ扫码关注QQ群:Apache Flink学习交流群(123414680)
以上是关于Apache Flink源码解析之stream-operator的主要内容,如果未能解决你的问题,请参考以下文章
Flink 1.13 源码解析——TaskManager启动流程 之 初始化TaskExecutor
Flink从入门到放弃之源码解析系列-第2章 Flink执行计划生成
Flink sql 之 TopN 与 StreamPhysicalRankRule (源码解析)
Flink SQL 之 Calcite Volcano优化器(源码解析)