Apache Flink源码解析之stream-operator

Posted vinoYang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Flink源码解析之stream-operator相关的知识,希望对你有一定的参考价值。

前面我们谈论了Flink stream中的transformation。你可以将transformation看成编写Flink程序并构建流式处理程序的必要组成部分(静态表现形式);而本篇我们将探讨transformation在Flink运行时对应的动态表现形式——operator。他们之间的映射关系见下图:

技术分享

具体的探讨可以查看前文:Flink中的一些核心概念

StreamOperator

所有operator的最终基类,operator的分类方式,按照输入流个数不同分为:

  • 无输入:StreamSource
  • 单个流输入:OneInputStreamOperator
  • 两个流输入:TwoInputStreamOperator

跟生命周期有关的核心抽象方法:

  • setup : 实例化operator
  • open :该方法会在任何元素被处理之前执行,它的实现通常包含了operator的初始化逻辑
  • close :该方法在所有的元素都进入到operator被处理之后调用
  • dispose :该方法在operator生命周期的最后阶段执行,主要用于回收资源

StreamOperator及其实现类中还包含了一些状态恢复与保存相关的逻辑,但这些不是本文的主题,所有暂时不做探讨。

先来看一下整个package的类关系图:

技术分享

我们整个剖析方式大致也按照以上operator的分类方式以及类的层次结构来。

StreamSource

作为一个流处理DAG的起点,source operator相比其他operator无疑是特别的(从类的继承关系图也可以看出来)。

它需要接受SourceFunction的实例。并且我们可以看到,它的chaining strategyHEAD(它表示operator不能有前置operator,但可以作为其他operator的前置operator,下文会谈到)。

this.chainingStrategy = ChainingStrategy.HEAD;

StreamSource的实现略显复杂,因为它涉及到我们前面文章谈SourceFunction时谈到的SourceFunction.SourceContext的实现。在这里提供了三个实现,分别对应我们之前谈到的Flink对事件时间的三个分类:

技术分享

  • NonTimestampContext:针对ProcessingTime,该SourceContext时间戳设置为-1,并且不发射watermark
  • AutomaticWatermarkContext:针对IngestionTime,提供自动的watermark发射机制的SourceContext
  • ManualWatermarkContext:针对EventTime的人工发射watermarkSourceContext

它们之间的对应关系也体现在其run方法的实现中:

        switch (timeCharacteristic) {
            case EventTime:
                ctx = new ManualWatermarkContext<>(this, lockingObject, collector);
                break;
            case IngestionTime:
                ctx = new AutomaticWatermarkContext<>(this, lockingObject, collector,
                        getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
                break;
            case ProcessingTime:
                ctx = new NonTimestampContext<>(this, lockingObject, collector);
                break;
            default:
                throw new Exception(String.valueOf(timeCharacteristic));
        }

run方法内部会调用SourceFunctionrun方法:

try {
            userFunction.run(ctx);

            // if we get here, then the user function either exited after being done (finite source)
            // or the function was canceled or stopped. For the finite source case, we should emit
            // a final watermark that indicates that we reached the end of event-time
            if (!isCanceledOrStopped()) {
                ctx.emitWatermark(Watermark.MAX_WATERMARK);
            }
        } finally {
            // make sure that the context is closed in any case
            ctx.close();
        }

StreamSource通过一个属性:canceledOrStopped来控制sourceFunction的停止。

整个StreamSource的运行逻辑由run来表述,通过cancel来控制停止逻辑。

NonTimestampContext

NonTimestampContext会忽略时间戳,因此它的实现里稍微特别一点的地方在下面的这两个方法:

public void collectWithTimestamp(T element, long timestamp) {
    // ignore the timestamp
    collect(element);
}

以及

public void emitWatermark(Watermark mark) {
    owner.checkAsyncException();
    // do nothing else
}

第一个方法忽略了时间戳,第二个方法不发送watermark

ManualWatermarkContext

无需特别说明

AutomaticWatermarkContext

该类是自动发送watermark的实现,在构造器中接收参数watermarkInterval来指定自动发送watermark的时间间隔。具体的实现机制是,新建一个独立的发射线程,以指定的时间间隔发射:

            this.scheduleExecutor = Executors.newScheduledThreadPool(1);

            this.watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    final long currentTime = System.currentTimeMillis();

                    if (currentTime > nextWatermarkTime) {
                        // align the watermarks across all machines. this will ensure that we
                        // don‘t have watermarks that creep along at different intervals because
                        // the machine clocks are out of sync
                        final long watermarkTime = currentTime - (currentTime % watermarkInterval);

                        synchronized (lockingObjectParam) {
                            if (currentTime > nextWatermarkTime) {
                                outputParam.emitWatermark(new Watermark(watermarkTime));
                                nextWatermarkTime += watermarkInterval;
                            }
                        }
                    }
                }
            }, 0, watermarkInterval, TimeUnit.MILLISECONDS);

除了这种基于时间的以固定频率发射watermark的机制,在collect方法被调用时,也会检查当前的时间戳,如果达到发送条件也会触发emit watermark

而因为该类实现的是自动发送,在构造器中实现一个定时发送机制,所以emitWatermark方法也就不需要再实现发送逻辑(因为已不再需要用户程序调用emitWatermark方法了),而该方法在该类中的主要任务是负责停止自动发送。停止自动发送的触发条件是收到最后一个元素的信号(将最后一个元素的时间戳设置为Long.MAX_VALUE),emitWatermark收到该标识后,再将其往下游传递并关闭定时发送线程。

OneInputStreamOperator

单一输入流的operator接口,继承自StreamOperator。提供了两个接口方法:

  • processElement:处理到达该operator的一个元素
  • processWatermark:处理一个Watermark

TwoInputStreamOperator

支持两个流作为输入的operator,同样继承自StreamOperator。扩充了多个接口方法:

  • processElement1 : 处理来自第一个输入的某个元素
  • processElement2 : 处理来自第二个输入的某个元素
  • processWatermark1 : 处理来自第一个输入的一个Watermark
  • processWatermark2 : 处理来自第二个输入的一个Watermark

辅助实现类

Output

Collector的扩展,增加了发射WaterMark的功能。该接口主要供operator用于发射元素或者WaterMark

  • emitWatermark : 该发射WaterMark将广播给下游的所有operator

TimeCharacteristic

Flink在涉及到时间相关的处理时,将时间划分为三类。而时间类型的定义在Flink中就是用该枚举来表示:

  • ProcessingTime
  • IngestionTime
  • EventTime

这三种时间类型之前我们曾多次提及,这里不再啰嗦

TimestampedCollector

Output的包装器实现,它用于给元素设置时间戳

AbstractStreamOperator

该抽象类为实现一个具体的operator提供基本的支持,Flink内置提供的operator全部都直接或间接继承自AbstractStreamOperator

它内部包含了三大类的属性:

  • 配置属性
  • 运行时属性
  • 键值对状态属性

大都数方法都是辅助方法,值得一提的是setup方法。从这里我们可以看到所有operator标识符的生成方式:

String operatorIdentifier = getClass().getSimpleName() + "_" + config.getVertexID() + "_" + runtimeContext.getIndexOfThisSubtask();

可以看到标识是由”_”间隔的三段拼接而成。三段分别是:类名,vertex id,以及当前subtask的索引。

然后基于此标识,创建了用于存储状态的stateBackend

stateBackend = container.createStateBackend(operatorIdentifier, keySerializer);

stateBackenddispose方法中会被关闭。

AbstractStreamOperator并没有对open/close等生命周期方法提供具体的实现,这些方法的具体实现被后延至后面谈到的AbstractUdfStreamOperator中。

AbstractUdfStreamOperator

该类主要针对operator生命周期相关的方法(open/close/dispose)提供了模板实现。而这些实现都统一针对用户定义的Function的实例(简称udf)。

ChainingStrategy

该枚举定义了operatorchain strategy(链接策略)。当一个operator链接到其前置operator时,意味着它们将在同一个线程上执行。StreamOperator的默认值是HEAD,这意味着它将没有前置operator,不过它有可能成为其他operator的前置operator。大部分StreamOperator将该枚举以ALWAYS覆盖,表示它们将链接到一个前置operator

它的三个枚举值:

  • ALWAYS :上面已经提到过,它允许将当前operator链接到某前置operator,这是提升性能的良好实践,它能够提升operator的并行度
  • NEVER :该策略不支持operator被链接到某前置operator也不支持被作为其他operator的前置operator
  • HEAD :该策略表示operator没有前置operator,不过可以作为其他operatorchain header

内置的Operator实现

StreamCounter

元素累加器,没有什么特别的

StreamProject

这里需要解释一下,此处的project,并非通常所指的项目的意思,而是投射、投影的意思。你可以将其类比于SQL中的SELECT子句。因此他允许你选择你需要的fields集合。这通过其构造器的一个字段索引数组来指定:

processElement方法中,它依次遍历所有需要的字段索引,将元素中需要的字段提取出来,放入一个用于输出的outTuple,最后再将其发射出去:

    public void processElement(StreamRecord<IN> element) throws Exception {
        for (int i = 0; i < this.numFields; i++) {
            outTuple.setField(((Tuple) element.getValue()).getField(fields[i]), i);
        }
        output.collect(element.replace(outTuple));
    }

StreamFilter

filter operator,处理逻辑很简单,根据自定义的FilterFunction方法,对每个元素进行过滤,如果满足过滤条件,则将该元素emit出去。

StreamMap

map operator,根据传入的MapFunction,对每个元素应用map操作后将其发射出去。

StreamFlatMap

flatmap operator接收FlatMapFunction函数,有一些特别之处:在其open方法中,它初始化了一个TimestampedCollector,作为传递给FlatMapFunctioncollector,该collector是给那些特定的userFunction使用的,并且用于给他们操作的元素设置时间戳。

StreamGroupedFold

分组的fold operatorfold函数的执行依赖于一个初始化值initialValue。因此这里涉及到状态保存。并且状态是跟具体的分区关联的。因此,在open方法的实现中,需要获得跟分区关联的ValueState

        ValueStateDescriptor<OUT> stateId = new ValueStateDescriptor<>(STATE_NAME, outTypeSerializer, null);
        values = getPartitionedState(stateId);

processElement方法的实现,涉及到一系列的操作:从ValueState中获取数据,作为“新”的初始值跟当前元素一起进行fold函数运算,获得结果后更新ValueState,然后将获得的结果emit出去。

StreamGroupedReduce

按分组进行reduce操作的operator.

基于特定的状态名称:

private static final String STATE_NAME = "_op_state";

构建状态id

ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer, null);

然后再获取状态值:

values = getPartitionedState(stateId);

以上两个动作在open方法中实现

processElement方法中,分为两种情况:

  • 如果之前已存在状态值,那么拿当前值跟之前的状态值做reduce并获得结果,将结果再次更新到最新状态并emit出去
  • 如果之前不存在状态值,那么直接将当前值更新到状态中,并将当前值emit出去

StreamSink

sink operator,通常是流处理的最后一个operator。它接收SinkFunction的实例。在processElement中依次调用其invoke方法。

小结

本文主要探讨了stream transformation的运行时形式operator的大致实现。


微信扫码关注公众号:Apache_Flink

技术分享


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

技术分享

以上是关于Apache Flink源码解析之stream-operator的主要内容,如果未能解决你的问题,请参考以下文章

Flink 1.13 源码解析——TaskManager启动流程 之 初始化TaskExecutor

Flink从入门到放弃之源码解析系列-第2章 Flink执行计划生成

Flink sql 之 TopN 与 StreamPhysicalRankRule (源码解析)

Flink SQL 之 Calcite Volcano优化器(源码解析)

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----ResourceManager启动