FlinkFlink 源码之OperatorChain
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 源码之OperatorChain相关的知识,希望对你有一定的参考价值。
1.概述
前言
OperatorChain是Flink中一个很重要的优化措施,能够将尽可能多的满足条件的数据处理操作在一个slot中串联完成,从而最小化线程执行上下文切换和网络通信,提高流计算系统性能。
Flink判断哪些操作可以纳入同一个chain的逻辑位于JobGraph生成过程中,详情请参见:Flink 源码之JobGraph生成。
名词解释
- StreamEdge:为StreamGraph拓扑图中的元素。StreamGraph由StreamNode和StreamEdge构成DAG图。详情参见 Flink 源码之StreamGraph生成。
- RecordWriterOutput:一种operator输出(Output)类型,用于将数据通过RecordWriter输出到ResultPartition。
- ChainingOutput:和RecordWriterOutput类似,也是一种operator输出类型,只不过ChainingOutput是在OperatorChain中专用的。它作为桥梁,将上游operator处理过得数据交给下游的operator。后面章节有详细分析。
- TypeSerializer:用于从DataInputView读取字节数组并反序列化为T类型,或者是将T类型序列化为DataOutputView。其中DataInputView和DataOutputView均直接操纵字节数组,这些字节数组的实际存储由MemorySegment支撑。
- StreamOperatorWrapper:用于包装StreamOperator,OperatorChain专用。它具有两个指针,分别指向前后两个operator,形成一个双向链表。Chain的概念由此而来。
接下来我们从OperatorChain的构造方法开始展开分析。
构造方法
OperatorChain在StreamTask中beforeInvoke方法构建出来(参见 Flink 源码之StreamTask)。获取chain到一起的operator(为OperatorChain中的mainOperator,如何生成chain到一起的operator的具体过程后面有分析),有数据到来的时候数据便交由mainOperator来处理。
OperatorChain的构造函数和分析如下所示:
public OperatorChain(
StreamTask<OUT, OP> containingTask,
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate)
// 创建发送和接收OperatorEvent的Dispatcher
this.operatorEventDispatcher =
new OperatorEventDispatcherImpl(
containingTask.getEnvironment().getUserCodeClassLoader().asClassLoader(),
containingTask.getEnvironment().getOperatorCoordinatorEventGateway());
// 获取用户代码类加载器
final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
// 获取任务的配置
final StreamConfig configuration = containingTask.getConfiguration();
// 获取StreamTask的StreamOperator工厂
StreamOperatorFactory<OUT> operatorFactory =
configuration.getStreamOperatorFactory(userCodeClassloader);
// we read the chained configs, and the order of record writer registrations by output name
// 获取OperatorChain中所有StreamOperator对应的StreamConfig,map的key为vertexID
Map<Integer, StreamConfig> chainedConfigs =
configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);
// create the final output stream writers
// we iterate through all the out edges from this job vertex and create a stream output
// 按照数据流顺序,获取各个任务的StreamEdge
List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap =
new HashMap<>(outEdgesInOrder.size());
this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
// from here on, we need to make sure that the output writers are shut down again on failure
boolean success = false;
try
// 创建链式输出
// 用于初始化streamOutputMap变量
// streamOutputMap保存了每步操作的StreamEdge和output的对应关系
createChainOutputs(
outEdgesInOrder,
recordWriterDelegate,
chainedConfigs,
containingTask,
streamOutputMap);
// we create the chain of operators and grab the collector that leads into the chain
// 创建包含所有operatorWrapper的集合
List<StreamOperatorWrapper<?, ?>> allOpWrappers =
new ArrayList<>(chainedConfigs.size());
// 创建mainOperator对应的output
// OperatorChain的入口Operator为mainOperator
// 这个operator通过ChainingOutput按照数据流向顺序串联了OperatorChain中的所有operator
this.mainOperatorOutput =
createOutputCollector(
containingTask,
configuration,
chainedConfigs,
userCodeClassloader,
streamOutputMap,
allOpWrappers,
containingTask.getMailboxExecutorFactory());
if (operatorFactory != null)
// 创建mainOperator和时间服务
Tuple2<OP, Optional<ProcessingTimeService>> mainOperatorAndTimeService =
StreamOperatorFactoryUtil.createOperator(
operatorFactory,
containingTask,
configuration,
mainOperatorOutput,
operatorEventDispatcher);
OP mainOperator = mainOperatorAndTimeService.f0;
// 设置Watermark监控项
mainOperator
.getMetricGroup()
.gauge(
MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
mainOperatorOutput.getWatermarkGauge());
// 创建mainOperatorWrapper
this.mainOperatorWrapper =
createOperatorWrapper(
mainOperator,
containingTask,
configuration,
mainOperatorAndTimeService.f1,
true);
// add main operator to end of chain
// 将mainOperatorWrapper添加到chain的最后
allOpWrappers.add(mainOperatorWrapper);
// createOutputCollector方法将各个operator包装到operatorWrapper中
// 按照数据流相反的顺序加入到allOpWrappers集合
// 所以,尾部的operatorWrapper就是index为0的元素
this.tailOperatorWrapper = allOpWrappers.get(0);
else
// 如果OperatorFactory为null
checkState(allOpWrappers.size() == 0);
this.mainOperatorWrapper = null;
this.tailOperatorWrapper = null;
// 创建chain数据源
this.chainedSources =
createChainedSources(
containingTask,
configuration.getInputs(userCodeClassloader),
chainedConfigs,
userCodeClassloader,
allOpWrappers);
this.numOperators = allOpWrappers.size();
// 将所有的StreamOperatorWrapper按照从上游到下游的顺序,形成双向链表
firstOperatorWrapper = linkOperatorWrappers(allOpWrappers);
success = true;
finally
// make sure we clean up after ourselves in case of a failure after acquiring
// the first resources
if (!success)
for (RecordWriterOutput<?> output : this.streamOutputs)
if (output != null)
output.close();
createChainOutputs
createChainOutputs方法作用为生成并保存每个StreamEdge和streamOutput的对应关系。代码如下所示:
private void createChainOutputs(
List<StreamEdge> outEdgesInOrder,
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate,
Map<Integer, StreamConfig> chainedConfigs,
StreamTask<OUT, OP> containingTask,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap)
// 遍历已排序的StreamEdge
for (int i = 0; i < outEdgesInOrder.size(); i++)
StreamEdge outEdge = outEdgesInOrder.get(i);
// 创建streamOutput
RecordWriterOutput<?> streamOutput =
createStreamOutput(
recordWriterDelegate.getRecordWriter(i),
outEdge,
chainedConfigs.get(outEdge.getSourceId()),
containingTask.getEnvironment());
// 更新streamOutput数组
this.streamOutputs[i] = streamOutput;
// 保存每个StreamEdge和streamOutput的对应关系
streamOutputMap.put(outEdge, streamOutput);
接着继续分析createStreamOutput方法:
private RecordWriterOutput<OUT> createStreamOutput(
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
StreamEdge edge,
StreamConfig upStreamConfig,
Environment taskEnvironment)
// 获取Output标签,如果没有配置旁路输出,没有OutputTag
OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput
TypeSerializer outSerializer = null;
// 根据是否为旁路输出,获取对应的类型序列化器
if (edge.getOutputTag() != null)
// side output
outSerializer =
upStreamConfig.getTypeSerializerSideOut(
edge.getOutputTag(),
taskEnvironment.getUserCodeClassLoader().asClassLoader());
else
// main output
outSerializer =
upStreamConfig.getTypeSerializerOut(
taskEnvironment.getUserCodeClassLoader().asClassLoader());
// 返回创建的RecordWriterOutput
return new RecordWriterOutput<>(recordWriter, outSerializer, sideOutputTag, this);
createOutputCollector
这个方法是chain的主要逻辑所在。我们重点分析。createOutputCollector方法分析如下:
private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
MailboxExecutorFactory mailboxExecutorFactory)
List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs =
new ArrayList<>(4);
// create collectors for the network outputs
// 遍历非链式StreamEdge,非链式的StreamEdge输出需要走网络连接
// 因此生成的Output类型为RecordWriterOutput
for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader))
@SuppressWarnings("unchecked")
// 从上一步createChainOutputs方法返回的streamOutputs中获取StreamEdge对应的output
RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
// 加入到allOutputs集合中
allOutputs.add(new Tuple2<>(output, outputEdge));
// Create collectors for the chained outputs
// 获取该Operator对应的所有chained StreamEdge
// 如果这个Operator具有多个chained的下游,这里会获取到多个outEdge
for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader))
int outputId = outputEdge.getTargetId();
// 获取这个outputEdge对应的StreamConfig
StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
// 根据StreamEdge生成streamOutput,为WatermarkGaugeExposingOutput类型
// WatermarkGaugeExposingOutput包装了Output和一个监控watermark的仪表盘
// 如果存在可以chain的operator,需要递归调用,将下游与上游链接起来
WatermarkGaugeExposingOutput<StreamRecord<T>> output =
createOperatorChain(
containingTask,
chainedOpConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperatorWrappers,
outputEdge.getOutputTag(),
mailboxExecutorFactory);
// 将其加入allOutputs集合中
allOutputs.add(new Tuple2<>(output, outputEdge));
// 如果输出只有一个,返回这个输出
if (allOutputs.size() == 1)
return allOutputs.get(0).f0;
else
// send to N outputs. Note that this includes the special case
// of sending to zero outputs
// 如果有多个输出,将allOutputs转换为Output类型数组
@SuppressWarnings("unchecked")
Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
for (int i = 0; i < allOutputs.size(); i++)
asArray[i] = allOutputs.get(i).f0;
// This is the inverse of creating the normal ChainingOutput.
// If the chaining output does not copy we need to copy in the broadcast output,
// otherwise multi-chaining would not work correctly.
// 根据配置中对象是否可重用,创建不同的OutputCollector
if (containingTask.getExecutionConfig().isObjectReuseEnabled())
// 在StreamRecord发往下游的时候实际发送的是StreamRecord的浅拷贝
// 避免使用深拷贝,从而提高性能,但是需要注意如果开启ObjectReuse
// 避免在下游改变流数据元素的值,否则会出现线程安全问题
return new CopyingBroadcastingOutputCollector<>(asArray, this);
else
return new BroadcastingOutputCollector<>(asArray, this);
然后需要分析createOperatorChain方法。它将OperatorChain中所有的Operator包装为StreamOperatorWrapper类型,按照数据流反方向存入allOperatorWrappers集合。根据operator的顺序,依次生成ChainingOutput,将各个operator数据流串联起来。该方法内容如下:
private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createOperatorChain(
StreamTask<OUT, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
OutputTag<IN> outputTag,
MailboxExecutorFactory mailboxExecutorFactory)
// create the output that the operator writes to first. this may recursively create more
// operators
// 这里的operatorConfig为前一个方法中每次遍历的chainedOpConfig
// 这里存在一个递归调用,将下游outEdge对应的StreamConfig作为参数,再次调用createOutputCollector
// 最终的效果为上游operator的output指向下游operator,实现了chain,即链式调用
// 最先返回的是最下游的output
// operator的output按照从下游到上游的顺序,依次被包装为WatermarkGaugeExposingOutput
WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput =
createOutputCollector(
containingTask,
operatorConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperatorWrappers,
mailboxExecutorFactory);
// 创建链式operator
// 参数中使用上一步生成的operator output
OneInputStreamOperator<IN, OUT> chainedOperator =
createOperator(
containingTask,
operatorConfig,
userCodeClassloader,
chainedOperatorOutput,
allOperatorWrappers,
false);
// 将operator包装到output中并返回,后面分析
return wrapOperatorIntoOutput(
chainedOperator, containingTask, operatorConfig, userCodeClassloader, outputTag);
createOperator方法根据operatorConfig创建出StreamOperator,然后使用StreamOperatorWrapper包装:
private <OUT, OP extends StreamOperator<OUT>> OP createOperator(
StreamTask<OUT, ?> containingTask,
StreamConfig operatorConfig,
ClassLoader userCodeClassloader,
WatermarkGaugeExposingOutput<StreamRecord<OUT>> output,
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
boolean isHead)
// now create the operator and give it the output collector to write its output to
// 使用StreamOperatorFactory创建出一个StreamOperator,使用指定的output
// 这个方法比较复杂,这里不再介绍
Tuple2<OP, Optional<ProcessingTimeService>> chainedOperatorAndTimeService =
StreamOperatorFactoryUtil.createOperator(
operatorConfig.getStreamOperatorFactory(userCodeClassloader),
containingTask,
operatorConfig,
output,
operatorEventDispatcher);
// 获取创建的operator
OP chainedOperator = chainedOperatorAndTimeService.f0;
// 使用StreamOperatorWrapper包装此新创建的operator
// StreamOperatorWrapper是operator在chaining时执行专用的封装类型,后面分析
// 由于是递归调用,最先执行到这里的是最下游的算子
// 因此allOperatorWrappers保存的顺序实际上是operator按照数据流向反向排列
allOperatorWrappers.add(
createOperatorWrapper(
chainedOperator,
containingTask,
operatorConfig,
chainedOperatorAndTimeService.f1,
isHead));
// 添加一个watermark监控用仪表盘
chainedOperator
.getMetricGroup()
.gauge(
MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
output.getWatermarkGauge()::getValue);
return chainedOperator;
这里我们重点说下createOperatorWrapper。该方法使用StreamOperatorWrapper将StreamOperator包装起来。大家可能会问为什么这里需要使用StreamOperatorWrapper。我们看下StreamOperatorWrapper中的部分属性和方法。
private StreamOperatorWrapper<?, ?> previous;
private StreamOperatorWrapper<?, ?> next;
// 中间省略...
public void close(StreamTaskActionExecutor actionExecutor, boolean isStoppingBySyncSavepoint)
throws Exception
if 以上是关于FlinkFlink 源码之OperatorChain的主要内容,如果未能解决你的问题,请参考以下文章
FlinkFlink 源码之Buffer Debloating