Flink详解JobGraph

Posted 小猪猪家的大猪猪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink详解JobGraph相关的知识,希望对你有一定的参考价值。

概述

JobGraph 是 StreamGraph 优化后的产物,客户端会将优化后的 JobGraph 发送给 JM。接下来的文章涉及到一些前置知识点,没有看前几期的小伙伴最好看一下前几期:

  1. 【Flink】详解StreamGraph
  2. 【Flink】浅谈Flink架构和调度
  3. 【Flink】详解Flink的八种分区

Flink 在客户端将 StreamGraph 对象转换成 JobGraph 对象,这个转换的核心在于将多个符合条件的 StreamNode 节点合并在一起,形成一个 JobVertex 节点,这样的优化方式称之为算子链合并,这样做可以有效减少数据在节点间传递所需的序列化、反序列化操作。同一个算子链中的算子运行在同一个 TaskSlot 中,也可由理解为运行在一个线程中,这样可以显著降低线程切换的性能开销,并且能增大吞吐量和降低延迟。

源码分析

JobGraph 的构建

JobGraph 的相关代码主要在【flink-runtime】模块下的 org.apache.flink.runtime.JobGraph 中。其调用链路是 StreamGraph#getJobGraphStreamingJobGraphGenerator#createJobGraph()

/*------------------------ StreamGraph ---------------------------*/
// 构造入口
public JobGraph getJobGraph()   
    return getJobGraph(null);  
  
   
public JobGraph getJobGraph(@Nullable JobID jobID)   
    return StreamingJobGraphGenerator.createJobGraph(this, jobID);  

/*---------------------------------------------------------*/

/*--------------- StreamingJobGraphGenerator ------------------*/

public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID)   
    return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();  


private JobGraph createJobGraph()   
	// 前置校验
    preValidate();  
    // 获取StreamGraph的调度模式
    // 设置JobGraph的调度模式
    jobGraph.setJobType(streamGraph.getJobType());  

	// jobGraph设置是否启动本地近似恢复策略
    jobGraph.enableApproximateLocalRecovery( streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
    // 为每一个StreamNode生成一个确定的哈希值
defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);  
  
    // 为兼容问题生成哈希值
    List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());  
    for (StreamGraphHasher hasher : legacyStreamGraphHashers)   
        legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));  
      

	// 这里是重点,JobGraph的顶点和边在这个方法中创建。
	// 尝试将尽可能多的StreamNode聚合在一个JobGraph节点中。
	// 判断算子chain,合并创建JobVertex,并生成JobEdge。
    setChaining(hashes, legacyHashes);  
    // 设置物理边界
    setPhysicalEdges();  
    // 设置jobGraph的SlotSharingGroup和CoLocationGroup
    setSlotSharingAndCoLocation();  
	
    setManagedMemoryFraction(  
            Collections.unmodifiableMap(jobVertices),  
            Collections.unmodifiableMap(vertexConfigs),  
            Collections.unmodifiableMap(chainedConfigs),  
            id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),  
            id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());  


    // 设置jobGraph的各个 JobVertex 的checkpoint 信息
    // 比如说source JobVertex 需要trigger checkpoint
    // 所有的JobVertex需要commit和ack checkpoint
    configureCheckpointing();
    
    // 设置保存点配置
    jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());  
  
    final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =  
            JobGraphUtils.prepareUserArtifactEntries(  
                    streamGraph.getUserArtifacts().stream()  
                            .collect(Collectors.toMap(e -> e.f0, e -> e.f1)),  
                    jobGraph.getJobID());  

	
    for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :  
            distributedCacheEntries.entrySet())   
        jobGraph.addUserArtifact(entry.getKey(), entry.getValue());  
      
  
    // 设置运行时配置信息
    try   
        jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());  
     catch (IOException e)   
        throw new IllegalConfigurationException(  
                "Could not serialize the ExecutionConfig."  
                        + "This indicates that non-serializable types (like custom serializers) were registered");  
      
	// 返回JobGraph对象
    return jobGraph;  


/*---------------------------------------------------------*/

// 定义Flink-Job调度枚举类型
public enum JobType   
	// 批处理模式
    BATCH,  
    // 流处理模式
	STREAMING  

从上面的分析可以看出,由 StreamGraph 到 JobGraph 最重要的一步是创建算子链 setChaining(hashes, legacyHashes),这样做可以尽可能的多整合一些操作在同一个节点中完成,避免不必要的线程切换和网络通信。举一个简单一点的例子,DataStream.map(a -> a+1).filter(a -> a > 2),此时数据流有两个处理步骤,也就是两个算子组成,即 mapfilter,这两个算子会组成不同的 StreamNode 对象和 Task 对象,如果这两个 Task 不在一个 TaskSlot 或者一个 TM 中,那么必然涉及到网络传输,这样的执行性能会很差,为了优化这一点,Flink 引入了算子链的概念,一个算子链代表一组可以在同一个 TaskSlot 中执行的算子串。

/*--------------- StreamingJobGraphGenerator ------------------*/
// 从StreamNode递归创建JobVertex对象
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes)   

	final Map<Integer, OperatorChainInfo> chainEntryPoints =  
            buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);  
    
    final Collection<OperatorChainInfo> initialEntryPoints =  
            chainEntryPoints.entrySet().stream()  
                    .sorted(Comparator.comparing(Map.Entry::getKey))  
                    .map(Map.Entry::getValue)  
                    .collect(Collectors.toList());  
  
    // 创建算子链
    for (OperatorChainInfo info : initialEntryPoints)   
        createChain(  
                info.getStartNodeId(),  
                1, // 索引从1开始,0是Source
                info,  
                chainEntryPoints);  
      



// 创建算子链
private List<StreamEdge> createChain(
            final Integer currentNodeId,
            final int chainIndex,
            final OperatorChainInfo chainInfo,
            final Map<Integer, OperatorChainInfo> chainEntryPoints) 

		// 获取起始Node-ID
        Integer startNodeId = chainInfo.getStartNodeId();
        // builtVertices用于存放已经进行构建的StreamNode ID,避免重复构造
        if (!builtVertices.contains(startNodeId)) 
			// transitiveOutEdges 存储整个算子链的出边
            List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
			// chainableOutputs 存储所有可以形成算子链的StreamEdge
            List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            // nonChainableOutputs 存储不可以形成算子链的StreamEdge
            List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
			// 获取当前处理的SteamNode
            StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);

			// 对所有的StreamEdge进行处理,分为可以形成算子链和不可以形成算子链两类
            for (StreamEdge outEdge : currentNode.getOutEdges()) 
                if (isChainable(outEdge, streamGraph)) 
                    chainableOutputs.add(outEdge);
                 else 
                    nonChainableOutputs.add(outEdge);
                
            

			// 如果是可以形成算子链的StreamEdge对象,递归调用createChain,并添加到transitiveOutEdges
			// 递归结束条件:
			// 1. 当前节点不再有出边;
			// 2. 当前节点已经完成转换
            for (StreamEdge chainable : chainableOutputs) 
                transitiveOutEdges.addAll(
                        createChain(
                                chainable.getTargetId(),
                                chainIndex + 1,
                                chainInfo,
                                chainEntryPoints));
            

			// 如果是不可被chain的StreamEdge,添加到transitiveOutEdges集合中
            for (StreamEdge nonChainable : nonChainableOutputs) 
                transitiveOutEdges.add(nonChainable);
                createChain(
                        nonChainable.getTargetId(),
                        1, // operators start at position 1 because 0 is for chained source inputs
                        chainEntryPoints.computeIfAbsent(
                                nonChainable.getTargetId(),
                                (k) -> chainInfo.newChain(nonChainable.getTargetId())),
                        chainEntryPoints);
            
            
			// 设置算子链名称
            chainedNames.put(
                    currentNodeId,
                    createChainedName(
                            currentNodeId,
                            chainableOutputs,
                            Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
            // 设置算子链所需最小资源
            chainedMinResources.put(
                    currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
            // 设置算子链所需最佳资源
            chainedPreferredResources.put(
                    currentNodeId,
                    createChainedPreferredResources(currentNodeId, chainableOutputs));

			// 
            OperatorID currentOperatorId =
                    chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));

            if (currentNode.getInputFormat() != null) 
                getOrCreateFormatContainer(startNodeId)
                        .addInputFormat(currentOperatorId, currentNode.getInputFormat());
            

            if (currentNode.getOutputFormat() != null) 
                getOrCreateFormatContainer(startNodeId)
                        .addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
            

			// 如果currentNodeId和startNodeId相等,说明需要创建一个新的chain,会生成一个JobVertex
            StreamConfig config =
                    currentNodeId.equals(startNodeId)
                            ? createJobVertex(startNodeId, chainInfo)
                            : new StreamConfig(new Configuration());

			// 设置的顶点属性到config中
            setVertexConfig(
                    currentNodeId,
                    config,
                    chainableOutputs,
                    nonChainableOutputs,
                    chainInfo.getChainedSources());

            if (currentNodeId.equals(startNodeId)) 
				// 开始一个新的算子链的连接
                config.setChainStart();
                config.setChainIndex(chainIndex);
                config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
				// 对于每一个算子链,把它和指向下一个算子链的出边连接起来
                for (StreamEdge edge : transitiveOutEdges) 
                    connect(startNodeId, edge);
                
				// 
                config.setOutEdgesInOrder(transitiveOutEdges);
                config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

             else 
                chainedConfigs.computeIfAbsent(
                        startNodeId, k -> new HashMap<Integer, StreamConfig>());

                config.setChainIndex(chainIndex);
                StreamNode node = streamGraph.getStreamNode(currentNodeId);
                config.setOperatorName(node.getOperatorName());
                chainedConfigs.get(startNodeId).put(currentNodeId, config);
            

            config.setOperatorID(currentOperatorId);

            if (chainableOutputs.isEmpty()) 
                config.setChainEnd();
            
            return transitiveOutEdges;

         else 
            return new ArrayList<>();
        
    

// 判断是否可以形成算子链
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) 
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

        return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph);


private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph)   
    StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);  
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);  
  
    if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)  
            && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)  
            && (edge.getPartitioner() instanceof ForwardPartitioner)  
            && edge.getShuffleMode() != ShuffleMode.BATCH  
            && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()  
            && streamGraph.isChainingEnabled()))   
  
        return false;  
      
  
    for (StreamEdge inEdge : downStreamVertex.getInEdges())   
        if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber())   
            return false;  
          
      
    return true;  


public boolean isSameSlotSharingGroup(StreamNode downstreamVertex)   
    return (slotSharingGroup == null && downstreamVertex.slotSharingGroup == null)  
            || (slotSharingGroup != null  
                    && slotSharingGroup.equals(downstreamVertex.slotSharingGroup));  


private Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs(  
        final Map<Integer, byte[]> hashes, final List<Map<Integer, byte[]>> legacyHashes)   
  
    final Map<Integer, ChainedSourceInfo> chainedSources = new HashMap<>();  
    final Map<Integer, OperatorChainInfo> chainEntryPoints = new HashMap<>();  

	// 遍历所有的Source-StreamNode
    for (Integer sourceNodeId : streamGraph.getSourceIDs())   
	    // 根据ID获取StreamNode对象
        final StreamNode sourceNode = streamGraph.getStreamNode(sourceNodeId);  

        if (sourceNode.getOperatorFactory() instanceof SourceOperatorFactory  
                && sourceNode.getOutEdges().size() == 1)   
            final StreamEdge sourceOutEdge = sourceNode.getOutEdges().get(0);  
            final StreamNode target = streamGraph.getStreamNode(sourceOutEdge.getTargetId());  
            final ChainingStrategy targetChainingStrategy =  
                    target.getOperatorFactory().getChainingStrategy();  
  
            if (targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES  
                    && isChainableInput(sourceOutEdge, streamGraph))   
                final OperatorID opId = new OperatorID(hashes.get(sourceNodeId));  
                final StreamC

以上是关于Flink详解JobGraph的主要内容,如果未能解决你的问题,请参考以下文章

flink1.9 JobGraph部署到Yarn

Flink源码剖析之JobGraph的生成

FlinkFlink源码分析——批处理模式JobGraph的创建

Flink 核心组件 架构原理 多图剖析

Flink数据流图的生成----简单执行计划的生成

Flink运行时之流处理程序生成流图