Flink详解JobGraph
Posted 小猪猪家的大猪猪
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink详解JobGraph相关的知识,希望对你有一定的参考价值。
概述
JobGraph 是 StreamGraph 优化后的产物,客户端会将优化后的 JobGraph 发送给 JM。接下来的文章涉及到一些前置知识点,没有看前几期的小伙伴最好看一下前几期:
Flink 在客户端将 StreamGraph 对象转换成 JobGraph 对象,这个转换的核心在于将多个符合条件的 StreamNode 节点合并在一起,形成一个 JobVertex 节点,这样的优化方式称之为算子链合并,这样做可以有效减少数据在节点间传递所需的序列化、反序列化操作。同一个算子链中的算子运行在同一个 TaskSlot 中,也可由理解为运行在一个线程中,这样可以显著降低线程切换的性能开销,并且能增大吞吐量和降低延迟。
源码分析
JobGraph 的构建
JobGraph 的相关代码主要在【flink-runtime】模块下的 org.apache.flink.runtime.JobGraph
中。其调用链路是 StreamGraph#getJobGraph
→ StreamingJobGraphGenerator#createJobGraph()
→
/*------------------------ StreamGraph ---------------------------*/
// 构造入口
public JobGraph getJobGraph()
return getJobGraph(null);
public JobGraph getJobGraph(@Nullable JobID jobID)
return StreamingJobGraphGenerator.createJobGraph(this, jobID);
/*---------------------------------------------------------*/
/*--------------- StreamingJobGraphGenerator ------------------*/
public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID)
return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
private JobGraph createJobGraph()
// 前置校验
preValidate();
// 获取StreamGraph的调度模式
// 设置JobGraph的调度模式
jobGraph.setJobType(streamGraph.getJobType());
// jobGraph设置是否启动本地近似恢复策略
jobGraph.enableApproximateLocalRecovery( streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
// 为每一个StreamNode生成一个确定的哈希值
defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// 为兼容问题生成哈希值
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers)
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
// 这里是重点,JobGraph的顶点和边在这个方法中创建。
// 尝试将尽可能多的StreamNode聚合在一个JobGraph节点中。
// 判断算子chain,合并创建JobVertex,并生成JobEdge。
setChaining(hashes, legacyHashes);
// 设置物理边界
setPhysicalEdges();
// 设置jobGraph的SlotSharingGroup和CoLocationGroup
setSlotSharingAndCoLocation();
setManagedMemoryFraction(
Collections.unmodifiableMap(jobVertices),
Collections.unmodifiableMap(vertexConfigs),
Collections.unmodifiableMap(chainedConfigs),
id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
// 设置jobGraph的各个 JobVertex 的checkpoint 信息
// 比如说source JobVertex 需要trigger checkpoint
// 所有的JobVertex需要commit和ack checkpoint
configureCheckpointing();
// 设置保存点配置
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
JobGraphUtils.prepareUserArtifactEntries(
streamGraph.getUserArtifacts().stream()
.collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
jobGraph.getJobID());
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
distributedCacheEntries.entrySet())
jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
// 设置运行时配置信息
try
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
catch (IOException e)
throw new IllegalConfigurationException(
"Could not serialize the ExecutionConfig."
+ "This indicates that non-serializable types (like custom serializers) were registered");
// 返回JobGraph对象
return jobGraph;
/*---------------------------------------------------------*/
// 定义Flink-Job调度枚举类型
public enum JobType
// 批处理模式
BATCH,
// 流处理模式
STREAMING
从上面的分析可以看出,由 StreamGraph 到 JobGraph 最重要的一步是创建算子链 setChaining(hashes, legacyHashes)
,这样做可以尽可能的多整合一些操作在同一个节点中完成,避免不必要的线程切换和网络通信。举一个简单一点的例子,DataStream.map(a -> a+1).filter(a -> a > 2)
,此时数据流有两个处理步骤,也就是两个算子组成,即 map
和 filter
,这两个算子会组成不同的 StreamNode 对象和 Task 对象,如果这两个 Task 不在一个 TaskSlot 或者一个 TM 中,那么必然涉及到网络传输,这样的执行性能会很差,为了优化这一点,Flink 引入了算子链的概念,一个算子链代表一组可以在同一个 TaskSlot 中执行的算子串。
/*--------------- StreamingJobGraphGenerator ------------------*/
// 从StreamNode递归创建JobVertex对象
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes)
final Map<Integer, OperatorChainInfo> chainEntryPoints =
buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
final Collection<OperatorChainInfo> initialEntryPoints =
chainEntryPoints.entrySet().stream()
.sorted(Comparator.comparing(Map.Entry::getKey))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
// 创建算子链
for (OperatorChainInfo info : initialEntryPoints)
createChain(
info.getStartNodeId(),
1, // 索引从1开始,0是Source
info,
chainEntryPoints);
// 创建算子链
private List<StreamEdge> createChain(
final Integer currentNodeId,
final int chainIndex,
final OperatorChainInfo chainInfo,
final Map<Integer, OperatorChainInfo> chainEntryPoints)
// 获取起始Node-ID
Integer startNodeId = chainInfo.getStartNodeId();
// builtVertices用于存放已经进行构建的StreamNode ID,避免重复构造
if (!builtVertices.contains(startNodeId))
// transitiveOutEdges 存储整个算子链的出边
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
// chainableOutputs 存储所有可以形成算子链的StreamEdge
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
// nonChainableOutputs 存储不可以形成算子链的StreamEdge
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
// 获取当前处理的SteamNode
StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
// 对所有的StreamEdge进行处理,分为可以形成算子链和不可以形成算子链两类
for (StreamEdge outEdge : currentNode.getOutEdges())
if (isChainable(outEdge, streamGraph))
chainableOutputs.add(outEdge);
else
nonChainableOutputs.add(outEdge);
// 如果是可以形成算子链的StreamEdge对象,递归调用createChain,并添加到transitiveOutEdges
// 递归结束条件:
// 1. 当前节点不再有出边;
// 2. 当前节点已经完成转换
for (StreamEdge chainable : chainableOutputs)
transitiveOutEdges.addAll(
createChain(
chainable.getTargetId(),
chainIndex + 1,
chainInfo,
chainEntryPoints));
// 如果是不可被chain的StreamEdge,添加到transitiveOutEdges集合中
for (StreamEdge nonChainable : nonChainableOutputs)
transitiveOutEdges.add(nonChainable);
createChain(
nonChainable.getTargetId(),
1, // operators start at position 1 because 0 is for chained source inputs
chainEntryPoints.computeIfAbsent(
nonChainable.getTargetId(),
(k) -> chainInfo.newChain(nonChainable.getTargetId())),
chainEntryPoints);
// 设置算子链名称
chainedNames.put(
currentNodeId,
createChainedName(
currentNodeId,
chainableOutputs,
Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
// 设置算子链所需最小资源
chainedMinResources.put(
currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
// 设置算子链所需最佳资源
chainedPreferredResources.put(
currentNodeId,
createChainedPreferredResources(currentNodeId, chainableOutputs));
//
OperatorID currentOperatorId =
chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));
if (currentNode.getInputFormat() != null)
getOrCreateFormatContainer(startNodeId)
.addInputFormat(currentOperatorId, currentNode.getInputFormat());
if (currentNode.getOutputFormat() != null)
getOrCreateFormatContainer(startNodeId)
.addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
// 如果currentNodeId和startNodeId相等,说明需要创建一个新的chain,会生成一个JobVertex
StreamConfig config =
currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, chainInfo)
: new StreamConfig(new Configuration());
// 设置的顶点属性到config中
setVertexConfig(
currentNodeId,
config,
chainableOutputs,
nonChainableOutputs,
chainInfo.getChainedSources());
if (currentNodeId.equals(startNodeId))
// 开始一个新的算子链的连接
config.setChainStart();
config.setChainIndex(chainIndex);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
// 对于每一个算子链,把它和指向下一个算子链的出边连接起来
for (StreamEdge edge : transitiveOutEdges)
connect(startNodeId, edge);
//
config.setOutEdgesInOrder(transitiveOutEdges);
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
else
chainedConfigs.computeIfAbsent(
startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
config.setOperatorID(currentOperatorId);
if (chainableOutputs.isEmpty())
config.setChainEnd();
return transitiveOutEdges;
else
return new ArrayList<>();
// 判断是否可以形成算子链
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph)
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);
private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph)
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled()))
return false;
for (StreamEdge inEdge : downStreamVertex.getInEdges())
if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber())
return false;
return true;
public boolean isSameSlotSharingGroup(StreamNode downstreamVertex)
return (slotSharingGroup == null && downstreamVertex.slotSharingGroup == null)
|| (slotSharingGroup != null
&& slotSharingGroup.equals(downstreamVertex.slotSharingGroup));
private Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs(
final Map<Integer, byte[]> hashes, final List<Map<Integer, byte[]>> legacyHashes)
final Map<Integer, ChainedSourceInfo> chainedSources = new HashMap<>();
final Map<Integer, OperatorChainInfo> chainEntryPoints = new HashMap<>();
// 遍历所有的Source-StreamNode
for (Integer sourceNodeId : streamGraph.getSourceIDs())
// 根据ID获取StreamNode对象
final StreamNode sourceNode = streamGraph.getStreamNode(sourceNodeId);
if (sourceNode.getOperatorFactory() instanceof SourceOperatorFactory
&& sourceNode.getOutEdges().size() == 1)
final StreamEdge sourceOutEdge = sourceNode.getOutEdges().get(0);
final StreamNode target = streamGraph.getStreamNode(sourceOutEdge.getTargetId());
final ChainingStrategy targetChainingStrategy =
target.getOperatorFactory().getChainingStrategy();
if (targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES
&& isChainableInput(sourceOutEdge, streamGraph))
final OperatorID opId = new OperatorID(hashes.get(sourceNodeId));
final StreamC以上是关于Flink详解JobGraph的主要内容,如果未能解决你的问题,请参考以下文章