FlinkFlink 源码之OperatorChain

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 源码之OperatorChain相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink 源码之OperatorChain

前言

OperatorChain是Flink中一个很重要的优化措施,能够将尽可能多的满足条件的数据处理操作在一个slot中串联完成,从而最小化线程执行上下文切换和网络通信,提高流计算系统性能。

Flink判断哪些操作可以纳入同一个chain的逻辑位于JobGraph生成过程中,详情请参见:Flink 源码之JobGraph生成。

名词解释

  • StreamEdge:为StreamGraph拓扑图中的元素。StreamGraph由StreamNode和StreamEdge构成DAG图。详情参见 Flink 源码之StreamGraph生成。
  • RecordWriterOutput:一种operator输出(Output)类型,用于将数据通过RecordWriter输出到ResultPartition。
  • ChainingOutput:和RecordWriterOutput类似,也是一种operator输出类型,只不过ChainingOutput是在OperatorChain中专用的。它作为桥梁,将上游operator处理过得数据交给下游的operator。后面章节有详细分析。
  • TypeSerializer:用于从DataInputView读取字节数组并反序列化为T类型,或者是将T类型序列化为DataOutputView。其中DataInputView和DataOutputView均直接操纵字节数组,这些字节数组的实际存储由MemorySegment支撑。
  • StreamOperatorWrapper:用于包装StreamOperator,OperatorChain专用。它具有两个指针,分别指向前后两个operator,形成一个双向链表。Chain的概念由此而来。
    接下来我们从OperatorChain的构造方法开始展开分析。

构造方法

OperatorChain在StreamTask中beforeInvoke方法构建出来(参见 Flink 源码之StreamTask)。获取chain到一起的operator(为OperatorChain中的mainOperator,如何生成chain到一起的operator的具体过程后面有分析),有数据到来的时候数据便交由mainOperator来处理。

OperatorChain的构造函数和分析如下所示:

public OperatorChain(
    StreamTask<OUT, OP> containingTask,
    RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) 

    // 创建发送和接收OperatorEvent的Dispatcher
    this.operatorEventDispatcher =
        new OperatorEventDispatcherImpl(
        containingTask.getEnvironment().getUserCodeClassLoader().asClassLoader(),
        containingTask.getEnvironment().getOperatorCoordinatorEventGateway());

    // 获取用户代码类加载器
    final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
    // 获取任务的配置
    final StreamConfig configuration = containingTask.getConfiguration();

    // 获取StreamTask的StreamOperator工厂
    StreamOperatorFactory<OUT> operatorFactory =
        configuration.getStreamOperatorFactory(userCodeClassloader);

    // we read the chained configs, and the order of record writer registrations by output name
    // 获取OperatorChain中所有StreamOperator对应的StreamConfig,map的key为vertexID
    Map<Integer, StreamConfig> chainedConfigs =
        configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);

    // create the final output stream writers
    // we iterate through all the out edges from this job vertex and create a stream output
    // 按照数据流顺序,获取各个任务的StreamEdge
    List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
    Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap =
        new HashMap<>(outEdgesInOrder.size());
    this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];

    // from here on, we need to make sure that the output writers are shut down again on failure
    boolean success = false;
    try 
        // 创建链式输出
        // 用于初始化streamOutputMap变量
        // streamOutputMap保存了每步操作的StreamEdge和output的对应关系
        createChainOutputs(
            outEdgesInOrder,
            recordWriterDelegate,
            chainedConfigs,
            containingTask,
            streamOutputMap);

        // we create the chain of operators and grab the collector that leads into the chain
        // 创建包含所有operatorWrapper的集合
        List<StreamOperatorWrapper<?, ?>> allOpWrappers =
            new ArrayList<>(chainedConfigs.size());
        // 创建mainOperator对应的output
        // OperatorChain的入口Operator为mainOperator
        // 这个operator通过ChainingOutput按照数据流向顺序串联了OperatorChain中的所有operator
        this.mainOperatorOutput =
            createOutputCollector(
            containingTask,
            configuration,
            chainedConfigs,
            userCodeClassloader,
            streamOutputMap,
            allOpWrappers,
            containingTask.getMailboxExecutorFactory());

        if (operatorFactory != null) 
            // 创建mainOperator和时间服务
            Tuple2<OP, Optional<ProcessingTimeService>> mainOperatorAndTimeService =
                StreamOperatorFactoryUtil.createOperator(
                operatorFactory,
                containingTask,
                configuration,
                mainOperatorOutput,
                operatorEventDispatcher);

            OP mainOperator = mainOperatorAndTimeService.f0;
            // 设置Watermark监控项
            mainOperator
                .getMetricGroup()
                .gauge(
                MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
                mainOperatorOutput.getWatermarkGauge());
            // 创建mainOperatorWrapper
            this.mainOperatorWrapper =
                createOperatorWrapper(
                mainOperator,
                containingTask,
                configuration,
                mainOperatorAndTimeService.f1,
                true);

            // add main operator to end of chain
            // 将mainOperatorWrapper添加到chain的最后
            allOpWrappers.add(mainOperatorWrapper);

            // createOutputCollector方法将各个operator包装到operatorWrapper中
            // 按照数据流相反的顺序加入到allOpWrappers集合
            // 所以,尾部的operatorWrapper就是index为0的元素
            this.tailOperatorWrapper = allOpWrappers.get(0);
         else 
            // 如果OperatorFactory为null
            checkState(allOpWrappers.size() == 0);
            this.mainOperatorWrapper = null;
            this.tailOperatorWrapper = null;
        

        // 创建chain数据源
        this.chainedSources =
            createChainedSources(
            containingTask,
            configuration.getInputs(userCodeClassloader),
            chainedConfigs,
            userCodeClassloader,
            allOpWrappers);

        this.numOperators = allOpWrappers.size();

        // 将所有的StreamOperatorWrapper按照从上游到下游的顺序,形成双向链表
        firstOperatorWrapper = linkOperatorWrappers(allOpWrappers);

        success = true;
     finally 
        // make sure we clean up after ourselves in case of a failure after acquiring
        // the first resources
        if (!success) 
            for (RecordWriterOutput<?> output : this.streamOutputs) 
                if (output != null) 
                    output.close();
                
            
        
    

createChainOutputs

createChainOutputs方法作用为生成并保存每个StreamEdge和streamOutput的对应关系。代码如下所示:

private void createChainOutputs(
    List<StreamEdge> outEdgesInOrder,
    RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
    Map<Integer, StreamConfig> chainedConfigs,
    StreamTask<OUT, OP> containingTask,
    Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap) 
    // 遍历已排序的StreamEdge
    for (int i = 0; i < outEdgesInOrder.size(); i++) 
        StreamEdge outEdge = outEdgesInOrder.get(i);

        // 创建streamOutput
        RecordWriterOutput<?> streamOutput =
            createStreamOutput(
            recordWriterDelegate.getRecordWriter(i),
            outEdge,
            chainedConfigs.get(outEdge.getSourceId()),
            containingTask.getEnvironment());

        // 更新streamOutput数组
        this.streamOutputs[i] = streamOutput;
        // 保存每个StreamEdge和streamOutput的对应关系
        streamOutputMap.put(outEdge, streamOutput);
    

接着继续分析createStreamOutput方法:

private RecordWriterOutput<OUT> createStreamOutput(
    RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
    StreamEdge edge,
    StreamConfig upStreamConfig,
    Environment taskEnvironment) 
    // 获取Output标签,如果没有配置旁路输出,没有OutputTag
    OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput

    TypeSerializer outSerializer = null;

    // 根据是否为旁路输出,获取对应的类型序列化器
    if (edge.getOutputTag() != null) 
        // side output
        outSerializer =
            upStreamConfig.getTypeSerializerSideOut(
            edge.getOutputTag(),
            taskEnvironment.getUserCodeClassLoader().asClassLoader());
     else 
        // main output
        outSerializer =
            upStreamConfig.getTypeSerializerOut(
            taskEnvironment.getUserCodeClassLoader().asClassLoader());
    

    // 返回创建的RecordWriterOutput
    return new RecordWriterOutput<>(recordWriter, outSerializer, sideOutputTag, this);

createOutputCollector

这个方法是chain的主要逻辑所在。我们重点分析。createOutputCollector方法分析如下:

private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
    StreamTask<?, ?> containingTask,
    StreamConfig operatorConfig,
    Map<Integer, StreamConfig> chainedConfigs,
    ClassLoader userCodeClassloader,
    Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
    List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
    MailboxExecutorFactory mailboxExecutorFactory) 
    List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs =
        new ArrayList<>(4);

    // create collectors for the network outputs
    // 遍历非链式StreamEdge,非链式的StreamEdge输出需要走网络连接
    // 因此生成的Output类型为RecordWriterOutput
    for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) 
        @SuppressWarnings("unchecked")
        // 从上一步createChainOutputs方法返回的streamOutputs中获取StreamEdge对应的output
        RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
        // 加入到allOutputs集合中
        allOutputs.add(new Tuple2<>(output, outputEdge));
    

    // Create collectors for the chained outputs
    // 获取该Operator对应的所有chained StreamEdge
    // 如果这个Operator具有多个chained的下游,这里会获取到多个outEdge
    for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) 
        int outputId = outputEdge.getTargetId();
        // 获取这个outputEdge对应的StreamConfig
        StreamConfig chainedOpConfig = chainedConfigs.get(outputId);

        // 根据StreamEdge生成streamOutput,为WatermarkGaugeExposingOutput类型
        // WatermarkGaugeExposingOutput包装了Output和一个监控watermark的仪表盘
        // 如果存在可以chain的operator,需要递归调用,将下游与上游链接起来
        WatermarkGaugeExposingOutput<StreamRecord<T>> output =
            createOperatorChain(
            containingTask,
            chainedOpConfig,
            chainedConfigs,
            userCodeClassloader,
            streamOutputs,
            allOperatorWrappers,
            outputEdge.getOutputTag(),
            mailboxExecutorFactory);
        // 将其加入allOutputs集合中
        allOutputs.add(new Tuple2<>(output, outputEdge));
    

    // 如果输出只有一个,返回这个输出
    if (allOutputs.size() == 1) 
        return allOutputs.get(0).f0;
     else 
        // send to N outputs. Note that this includes the special case
        // of sending to zero outputs
        // 如果有多个输出,将allOutputs转换为Output类型数组
        @SuppressWarnings("unchecked")
        Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
        for (int i = 0; i < allOutputs.size(); i++) 
            asArray[i] = allOutputs.get(i).f0;
        

        // This is the inverse of creating the normal ChainingOutput.
        // If the chaining output does not copy we need to copy in the broadcast output,
        // otherwise multi-chaining would not work correctly.
        // 根据配置中对象是否可重用,创建不同的OutputCollector
        if (containingTask.getExecutionConfig().isObjectReuseEnabled()) 
            // 在StreamRecord发往下游的时候实际发送的是StreamRecord的浅拷贝
            // 避免使用深拷贝,从而提高性能,但是需要注意如果开启ObjectReuse
            // 避免在下游改变流数据元素的值,否则会出现线程安全问题
            return new CopyingBroadcastingOutputCollector<>(asArray, this);
         else 
            return new BroadcastingOutputCollector<>(asArray, this);
        
    

然后需要分析createOperatorChain方法。它将OperatorChain中所有的Operator包装为StreamOperatorWrapper类型,按照数据流反方向存入allOperatorWrappers集合。根据operator的顺序,依次生成ChainingOutput,将各个operator数据流串联起来。该方法内容如下:

private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createOperatorChain(
    StreamTask<OUT, ?> containingTask,
    StreamConfig operatorConfig,
    Map<Integer, StreamConfig> chainedConfigs,
    ClassLoader userCodeClassloader,
    Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
    List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
    OutputTag<IN> outputTag,
    MailboxExecutorFactory mailboxExecutorFactory) 
    // create the output that the operator writes to first. this may recursively create more
    // operators
    // 这里的operatorConfig为前一个方法中每次遍历的chainedOpConfig
    // 这里存在一个递归调用,将下游outEdge对应的StreamConfig作为参数,再次调用createOutputCollector
    // 最终的效果为上游operator的output指向下游operator,实现了chain,即链式调用
    // 最先返回的是最下游的output
    // operator的output按照从下游到上游的顺序,依次被包装为WatermarkGaugeExposingOutput
    WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput =
        createOutputCollector(
        containingTask,
        operatorConfig,
        chainedConfigs,
        userCodeClassloader,
        streamOutputs,
        allOperatorWrappers,
        mailboxExecutorFactory);

    // 创建链式operator
    // 参数中使用上一步生成的operator output
    OneInputStreamOperator<IN, OUT> chainedOperator =
        createOperator(
        containingTask,
        operatorConfig,
        userCodeClassloader,
        chainedOperatorOutput,
        allOperatorWrappers,
        false);

    // 将operator包装到output中并返回,后面分析
    return wrapOperatorIntoOutput(
        chainedOperator, containingTask, operatorConfig, userCodeClassloader, outputTag);

createOperator方法根据operatorConfig创建出StreamOperator,然后使用StreamOperatorWrapper包装:

private <OUT, OP extends StreamOperator<OUT>> OP createOperator(
    StreamTask<OUT, ?> containingTask,
    StreamConfig operatorConfig,
    ClassLoader userCodeClassloader,
    WatermarkGaugeExposingOutput<StreamRecord<OUT>> output,
    List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
    boolean isHead) 

    // now create the operator and give it the output collector to write its output to
    // 使用StreamOperatorFactory创建出一个StreamOperator,使用指定的output
    // 这个方法比较复杂,这里不再介绍
    Tuple2<OP, Optional<ProcessingTimeService>> chainedOperatorAndTimeService =
        StreamOperatorFactoryUtil.createOperator(
        operatorConfig.getStreamOperatorFactory(userCodeClassloader),
        containingTask,
        operatorConfig,
        output,
        operatorEventDispatcher);

    // 获取创建的operator
    OP chainedOperator = chainedOperatorAndTimeService.f0;
    // 使用StreamOperatorWrapper包装此新创建的operator
    // StreamOperatorWrapper是operator在chaining时执行专用的封装类型,后面分析
    // 由于是递归调用,最先执行到这里的是最下游的算子
    // 因此allOperatorWrappers保存的顺序实际上是operator按照数据流向反向排列
    allOperatorWrappers.add(
        createOperatorWrapper(
            chainedOperator,
            containingTask,
            operatorConfig,
            chainedOperatorAndTimeService.f1,
            isHead));

    // 添加一个watermark监控用仪表盘
    chainedOperator
        .getMetricGroup()
        .gauge(
        MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
        output.getWatermarkGauge()::getValue);
    return chainedOperator;

这里我们重点说下createOperatorWrapper。该方法使用StreamOperatorWrapper将StreamOperator包装起来。大家可能会问为什么这里需要使用StreamOperatorWrapper。我们看下StreamOperatorWrapper中的部分属性和方法。

private StreamOperatorWrapper<?, ?> previous;

private StreamOperatorWrapper<?, ?> next;

// 中间省略...

public void close(StreamTaskActionExecutor actionExecutor, boolean isStoppingBySyncSavepoint)
    throws Exception 
    if 以上是关于FlinkFlink 源码之OperatorChain的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFlink 源码之ExecutionGraph

FlinkFlink 源码之RPC调用

FlinkFlink 源码之Buffer Debloating

FlinkFlink 源码之AsyncFunction异步 IO 源码

FlinkFlink 源码之OperatorChain

FlinkFlink 源码之 安全认证