Flink Flink 源码之 Buffer Timeout优化
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink Flink 源码之 Buffer Timeout优化相关的知识,希望对你有一定的参考价值。
1.概述
2.Buffer Timeout 概念
Flink每个算子向下游发送数据需要两个条件:
- 输出buffer空间占满
- buffer中数据存在时间超过buffer timeout配置值(默认值为100ms)
这个配置值对Flink性能影响至关重大
。配置的低,数据的延迟很小,但是会带量大量高频的网络通信,同时大幅提高CPU占用率。配置值过高buffer会经常填满,数据的延迟会增大很多。有文章表明,在大并发的情况下,如果对数据的延迟不是十分敏感,适当的调大buffer timeout(1s左右即可)可以降低CPU使用率 30% - 50%
。
3.Buffer Timeout 配置
Buffer timeout有两个级别:全局级别和算子级别。
全局级别的Buffer timeout通过StreamExecutionEnvironment.setBufferTimeout
方法配置。代码如下:
public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis)
if (timeoutMillis < -1)
throw new IllegalArgumentException("Timeout of buffer must be non-negative or -1");
this.bufferTimeout = timeoutMillis;
return this;
StreamExecutionEnvironment中设置的bufferTimeout
在构造StreamGraph的时候作为默认的buffer timeout使用。如果用户没有给算子指定专门的buffer timeout,自动采用默认的buffer timeout。
算子级别的Buffer timeout只影响这一个算子的配置。算子级别对应的是SingleOutputStreamOperator。我们查看它的setBufferTimeout方法:
public SingleOutputStreamOperator<T> setBufferTimeout(long timeoutMillis)
checkArgument(timeoutMillis >= -1, "timeout must be >= -1");
transformation.setBufferTimeout(timeoutMillis);
return this;
它为算子对应的Transformation对象设置了bufferTimeout属性。
4.Buffer Timeout 如何影响StreamGraph
Flink把Transformation翻译为StreamGraph需要用到各种各样的translator。我们查看下它的基类SimpleTransformationTranslator的configure方法片段;
// ...
StreamGraphUtils.configureBufferTimeout(
streamGraph, transformationId, transformation, context.getDefaultBufferTimeout());
// ...
它使用了StreamGraphUtils配置StreamGraph的缓存timeout。详细内容我们需要展开分析configureBufferTimeout方法:
public static <T> void configureBufferTimeout(
StreamGraph streamGraph,
int nodeId,
Transformation<T> transformation,
long defaultBufferTimeout)
if (transformation.getBufferTimeout() >= 0)
streamGraph.setBufferTimeout(nodeId, transformation.getBufferTimeout());
else
streamGraph.setBufferTimeout(nodeId, defaultBufferTimeout);
它接收的4个参数分别为:需要生成的streamGraph,StreamNode id,Transformation和默认的buffer timeout配置(StreamExecutionEnvironment级别的配置为默认配置)。
该方法又调用了StreamGraph的setBufferTimeout方法。我们继续跟踪。这个方法为Transformation对应的StreamNode设置bufferTimeout属性。
public void setBufferTimeout(Integer vertexID, long bufferTimeout)
if (getStreamNode(vertexID) != null)
getStreamNode(vertexID).setBufferTimeout(bufferTimeout);
到此位置我们得知用户为每个算子设定的buffer timeout配置最终反应到了StreamGraph中算子对应StreamNode的bufferTimeout属性。
下一章节开始分析bufferTimeout属性如何影响Flink 处理数据的行为。
5.行为
我们查看StreamEdge的构造函数:
public StreamEdge(
StreamNode sourceVertex,
StreamNode targetVertex,
int typeNumber,
StreamPartitioner<?> outputPartitioner,
OutputTag outputTag,
StreamExchangeMode exchangeMode)
this(
sourceVertex,
targetVertex,
typeNumber,
sourceVertex.getBufferTimeout(),
outputPartitioner,
outputTag,
exchangeMode);
可以发现StreamEdge的bufferTimeout是由sourceVertex,即Edge上游StreamNode的bufferTimeout属性决定的。
接着追踪StreamEdge的bufferTimeout调用过程,我们找到了StreamTask.createRecordWriter方法调用:
private static <OUT>
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
StreamConfig configuration, Environment environment)
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters =
new ArrayList<>();
List<StreamEdge> outEdgesInOrder =
configuration.getOutEdgesInOrder(
environment.getUserCodeClassLoader().asClassLoader());
// 遍历每个StreamEdge,逐个创建RecordWriter
// RecordWriter的bufferTimeout为Edge的bufferTimeout
for (int i = 0; i < outEdgesInOrder.size(); i++)
StreamEdge edge = outEdgesInOrder.get(i);
recordWriters.add(
createRecordWriter(
edge,
i,
environment,
environment.getTaskInfo().getTaskNameWithSubtasks(),
edge.getBufferTimeout()));
return recordWriters;
createRecordWriter方法内容片段如下。可知RecordWriter通过RecordWriterBuilder创建:
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
new RecordWriterBuilder<SerializationDelegate<StreamRecord<OUT>>>()
.setChannelSelector(outputPartitioner)
.setTimeout(bufferTimeout)
.setTaskName(taskName)
.build(bufferWriter);
继续查看RecordWriterBuilder的build方法:
public RecordWriter<T> build(ResultPartitionWriter writer)
if (selector.isBroadcast())
return new BroadcastRecordWriter<>(writer, timeout, taskName);
else
return new ChannelSelectorRecordWriter<>(writer, selector, timeout, taskName);
无论创建的是BroadcastRecordWriter
(广播形式写入数据到输出缓存)还是ChannelSelectorRecordWriter
(把数据写入到特定channel,例如keyBy算子),他们的父类都为RecordWriter
。所以接下来需要展开分析的内容为RecordWriter。
我们查看RecordWriter的构造函数,发现其中创建了一个OutputFlush对象(如果没有禁用network buffer timeout的话):
RecordWriter(ResultPartitionWriter writer, long timeout, String taskName)
this.targetPartition = writer;
this.numberOfChannels = writer.getNumberOfSubpartitions();
this.serializer = new DataOutputSerializer(128);
checkArgument(timeout >= ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT);
this.flushAlways = (timeout == ExecutionOptions.FLUSH_AFTER_EVERY_RECORD);
if (timeout == ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT
|| timeout == ExecutionOptions.FLUSH_AFTER_EVERY_RECORD)
outputFlusher = null;
else
String threadName =
taskName == null
? DEFAULT_OUTPUT_FLUSH_THREAD_NAME
: DEFAULT_OUTPUT_FLUSH_THREAD_NAME + " for " + taskName;
outputFlusher = new OutputFlusher(threadName, timeout);
outputFlusher.start();
OutputFlusher使用专门的线程,异步定时调用targetPartition的flushAll()方法。调用时间间隔就是setBufferTimeout的值。
@Override
public void run()
try
while (running)
try
// 每隔timeout这么长时间,就flush所有的数据
Thread.sleep(timeout);
catch (InterruptedException e)
// propagate this if we are still running, because it should not happen
// in that case
if (running)
throw new Exception(e);
// any errors here should let the thread come to a halt and be
// recognized by the writer
flushAll();
catch (Throwable t)
notifyFlusherException(t);
到此为止我们分析完了buffer timeout从配置到生成StreamGraph到如何影响Flink发送数据的完整过程。
以上是关于Flink Flink 源码之 Buffer Timeout优化的主要内容,如果未能解决你的问题,请参考以下文章
Flink源码系列Flink 源码分析之 Client 端启动流程分析
Flink 1.13 源码解析——TaskManager启动流程 之 初始化TaskExecutor
flink源码解读之StreamExecutionEnvironment