FlinkFlink 源码之ExecutionGraph

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 源码之ExecutionGraph相关的知识,希望对你有一定的参考价值。

1.概述

以前的一个老文章基于 Flink 1.9版本的,现在是基于flink 1.13版本的。

参考:95-230-028-源码-WordCount走读-获取ExecutionGraph

本文转载:Flink源码分析系列文档目录

2.从JobGraph到ExecutionGraph

JobGraph通过Dispatcher.submitJob方法提交。这是后续流程的入口方法。该方法调用了Dispatcher.internalSubmitJob,然后是Dispatcher.persistAndRunJob。

Dispatcher.persistAndRunJob方法存储并执行作业。如下所示:

private void persistAndRunJob(JobGraph jobGraph) throws Exception 
    jobGraphWriter.putJobGraph(jobGraph);
    runJob(jobGraph, ExecutionType.SUBMISSION);

Dispatcher.runJob接收JobGraph和执行类型两个参数。执行类型有两种:提交任务(SUBMISSION)和恢复任务(RECOVERY)。

private void runJob(JobGraph jobGraph, ExecutionType executionType) 
    // 确保JobID对应的这个作业目前不在运行状态,避免重复提交
    Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
    // 获取启动时时间戳
    long initializationTimestamp = System.currentTimeMillis();
    // 这里将JobManagerRunner创建出来
    // JobManagerRunner接下来会构造出JobManager
    CompletableFuture<JobManagerRunner> jobManagerRunnerFuture =
        createJobManagerRunner(jobGraph, initializationTimestamp);

    // 包装JobGraph相关信息供Dispatcher使用
    DispatcherJob dispatcherJob =
        DispatcherJob.createFor(
        jobManagerRunnerFuture,
        jobGraph.getJobID(),
        jobGraph.getName(),
        initializationTimestamp);
    
    // 将当前作业的ID加入runningJob集合
    // 表示当前作业已处于运行状态
    runningJobs.put(jobGraph.getJobID(), dispatcherJob);

    final JobID jobId = jobGraph.getJobID();

    // 处理Job派发结果
    final CompletableFuture<CleanupJobState> cleanupJobStateFuture =
        dispatcherJob
        .getResultFuture()
        .handleAsync(
        (dispatcherJobResult, throwable) -> 
            Preconditions.checkState(
                runningJobs.get(jobId) == dispatcherJob,
                "The job entry in runningJobs must be bound to the lifetime of the DispatcherJob.");

            if (dispatcherJobResult != null) 
                return handleDispatcherJobResult(
                    jobId, dispatcherJobResult, executionType);
             else 
                return dispatcherJobFailed(jobId, throwable);
            
        ,
        getMainThreadExecutor());

    // 作业停止的时候,将JobID从runningJob中移除
    final CompletableFuture<Void> jobTerminationFuture =
        cleanupJobStateFuture
        .thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState))
        .thenCompose(Function.identity());

    // 将作业ID和对应的作业停止future加入到dispatcherJobTerminationFutures集合维护
    FutureUtils.assertNoException(jobTerminationFuture);
    registerDispatcherJobTerminationFuture(jobId, jobTerminationFuture);

接下来是Dispatcher.createJobManagerRunner方法。

JobManager在Dispatcher中被创建出来,然后启动。创建JobManager的逻辑在createJobManagerRunner方法中,如下所示:

CompletableFuture<JobManagerRunner> createJobManagerRunner(
    JobGraph jobGraph, long initializationTimestamp) 
    final RpcService rpcService = getRpcService();
    return CompletableFuture.supplyAsync(
        () -> 
            try 
                // 使用工厂类创建JobManager
                // 传入了JobGraph和高可用服务
                JobManagerRunner runner =
                    jobManagerRunnerFactory.createJobManagerRunner(
                    jobGraph,
                    configuration,
                    rpcService,
                    highAvailabilityServices,
                    heartbeatServices,
                    jobManagerSharedServices,
                    new DefaultJobManagerJobMetricGroupFactory(
                        jobManagerMetricGroup),
                    fatalErrorHandler,
                    initializationTimestamp);
                // 启动JobManager
                // 实际上为启动JobManager的leader选举服务,选出JM主节点
                runner.start();
                return runner;
             catch (Exception e) 
                throw new CompletionException(
                    new JobInitializationException(
                        jobGraph.getJobID(),
                        "Could not instantiate JobManager.",
                        e));
            
        ,
        ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on
    // JobManager creation

此时,JobManager开始进行leader竞选活动。为了确保JobManager不存在单点故障问题,Flink设计了JobManager 高可用,可以同时运行多个JobManager实例。在Standalone部署方式中,JobManager的竞选通过Zookeeper来实现。Yarn集群模式下则通过Yarn的ApplicationMaster失败后自动重启动方式来确保JobManager的高可用。有关leader选举的内容请参见Flink 源码之leader选举(Zookeeper方式)。

一旦leader JM被选举出来,选举服务会调用对应JM的grantLeadership方法。该方法内容如下所示:

@Override
public void grantLeadership(final UUID leaderSessionID) 
    synchronized (lock) 
        if (shutdown) 
            log.debug(
                "JobManagerRunner cannot be granted leadership because it is already shut down.");
            return;
        

        leadershipOperation =
            leadershipOperation.thenRun(
            ThrowingRunnable.unchecked(
                () -> 
                    synchronized (lock) 
                        // 主要逻辑是这个
                        // 检查作业调度状态并启动JobManager
                        verifyJobSchedulingStatusAndStartJobManager(
                            leaderSessionID);
                    
                ));

        handleException(leadershipOperation, "Could not start the job manager.");
    

接着我们跟踪到verifyJobSchedulingStatusAndStartJobManager方法。

@GuardedBy("lock")
private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId)
    throws FlinkException 
    // 如果JobManager已停止,直接返回
    if (shutdown) 
        log.debug("Ignoring starting JobMaster because JobManagerRunner is already shut down.");
        return;
    

    // 从JobRegistry中获取Job调度状态
    final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus =
        getJobSchedulingStatus();

    // 如果作业已执行完毕
    // 调用作业执行完毕逻辑(实际上是作业未被当前JobManager完成运行的逻辑)
    if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE) 
        jobAlreadyDone();
     else 
        // 启动JobMaster
        startJobMaster(leaderSessionId);
    

现在逻辑流转到了JobManagerRunnerImpl.startJobMaster方法。

该方法启动JobMaster。注册JobGraph,启动JobMaster服务并确认该JobMaster为leader。

@GuardedBy("lock")
private void startJobMaster(UUID leaderSessionId) throws FlinkException 
    log.info(
        "JobManager runner for job  () was granted leadership with session id .",
        jobGraph.getName(),
        jobGraph.getJobID(),
        leaderSessionId);

    try 
        // 注册JobGraph
        // 根据集群部署形式(Standalone,Zookeeper或K8s),采用不同的方式存储JobID
        runningJobsRegistry.setJobRunning(jobGraph.getJobID());
     catch (IOException e) 
        throw new FlinkException(
            String.format(
                "Failed to set the job %s to running in the running jobs registry.",
                jobGraph.getJobID()),
            e);
    

    // 启动JobMaster服务
    startJobMasterServiceSafely(leaderSessionId);

    // 确认该JobMaster是leader状态
    if (jobMasterService != null) 
        confirmLeaderSessionIdIfStillLeader(jobMasterService, leaderSessionId);
    

JobManagerRunnerImpl.startJobMasterServiceSafely紧接着通过

DefaultJobMasterServiceFactory.createJobMasterService方法,创建出JobMaster并启动他的Rpc通信服务。

接下来。在JobMaster构造函数中存在构建Flink作业任务调度器的逻辑。JobMaster.createScheduler方法调用了

DefaultSlotPoolServiceSchedulerFactory.createScheduler创建Flink的调度器。该方法又调用了Scheduler工厂类的创建Scheduler实例这个方法DefaultSchedulerFactory.createInstance。

接下来的流程到了DefaultScheduler中。DefaultScheduler是Flink作业调度器的默认实现。它继承了SchedulerBase,SchedulerBase又实现了SchedulerNG接口。

SchedulerBase构造函数中调用了createAndRestoreExecutionGraph方法。

SchedulerBase.createAndRestoreExecutionGraph代码如下所示:

private ExecutionGraph createAndRestoreExecutionGraph(
    JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
    CompletedCheckpointStore completedCheckpointStore,
    CheckpointsCleaner checkpointsCleaner,
    CheckpointIDCounter checkpointIdCounter,
    ShuffleMaster<?> shuffleMaster,
    JobMasterPartitionTracker partitionTracker,
    ExecutionDeploymentTracker executionDeploymentTracker,
    long initializationTimestamp,
    ComponentMainThreadExecutor mainThreadExecutor,
    JobStatusListener jobStatusListener)
    throws Exception 

    // 创建ExecutionGraph
    ExecutionGraph newExecutionGraph =
        createExecutionGraph(
        currentJobManagerJobMetricGroup,
        completedCheckpointStore,
        checkpointsCleaner,
        checkpointIdCounter,
        shuffleMaster,
        partitionTracker,
        executionDeploymentTracker,
        initializationTimestamp);

    // 获取ExecutionGraph中创建出的CheckpointCoordinator
    // 创建CheckpointCoordinator的过程后面章节有说明
    final CheckpointCoordinator checkpointCoordinator =
        newExecutionGraph.getCheckpointCoordinator();

    if (checkpointCoordinator != null) 
        // check whether we find a valid checkpoint
        // 检查是否存在一个最近的checkpoint
        if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
            new HashSet<>(newExecutionGraph.getAllVertices().values()))) 

            // check whether we can restore from a savepoint
            // 如果有,尝试从这个检查点恢复
            tryRestoreExecutionGraphFromSavepoint(
                newExecutionGraph, jobGraph.getSavepointRestoreSettings());
        
    

    // 设置任务失败监听器
    newExecutionGraph.setInternalTaskFailuresListener(
        new UpdateSchedulerNgOnInternalFailuresListener(this));
    // 设置作业状态监听器
    newExecutionGraph.registerJobStatusListener(jobStatusListener);
    // 设置JobMaster的主线程ThreadExecutor
    newExecutionGraph.start(mainThreadExecutor);

    return newExecutionGraph;

SchedulerBase.createExecutionGraph方法调用DefaultExecutionGraphBuilder,创建出ExecutionGraph。代码如下所示:

private ExecutionGraph createExecutionGraph(
    JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
    CompletedCheckpointStore completedCheckpointStore,
    CheckpointsCleaner checkpointsCleaner,
    CheckpointIDCounter checkpointIdCounter,
    ShuffleMaster<?> shuffleMaster,
    final JobMasterPartitionTracker partitionTracker,
    ExecutionDeploymentTracker executionDeploymentTracker,
    long initializationTimestamp)
    throws JobExecutionException, JobException 

    // 创建Execution部署监听器
    ExecutionDeploymentListener executionDeploymentListener =
        new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
    //创建Execution状态更新监听器
    ExecutionStateUpdateListener executionStateUpdateListener =
        (execution, newState) -> 
        if (newState.isTerminal()) 
            executionDeploymentTracker.stopTrackingDeploymentOf(execution);
        
    ;

    // 创建ExecutionGraph
    return DefaultExecutionGraphBuilder.buildGraph(
        jobGraph,
        jobMasterConfiguration,
        futureExecutor,
        ioExecutor,
        userCodeLoader,
        completedCheckpointStore,
        checkpointsCleaner,
        checkpointIdCounter,
        rpcTimeout,
        currentJobManagerJobMetricGroup,
        blobWriter,
        log,
        shuffleMaster,
        partitionTracker,
        TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType(
            jobGraph.getJobType()),
        executionDeploymentListener,
        executionStateUpdateListener,
        initializationTimestamp,
        new DefaultVertexAttemptNumberStore());

ExecutionGraph相关概念
ExecutionGraph为Flink作业的物理执行计划。用来协调数据流的分布式执行过程。

和StreamGraph,JobGraph不同的是,ExecutionGraph是在JobManager中生成。

从ExecutionGraph也有顶点(Vertex)的概念,ExecutionGraph中的vertex为ExecutionJobVertex,和JobGraph中的JobVertex对应。从ExecutionGraph到JobGraph的过程中加入了并行度的概念,ExecutionJobVertex包含了与之对应的JobVertex中所有的并行任务。ExecutionJobVertex之中每一个并行的任务由ExecutionVertex代表。也就是说一个ExecutionJobVertex具有多少并行度,它下面就包含多少个ExecutionVertex。

ExecutionVertex可以被执行一次或多次(由于任务恢复,重计算或更新配置)ExecutionVertex的每一次执行都会生成一个Execution对象。Execution负责跟踪ExecutionVertex的任务执行状态变化和资源使用状况。

IntermediateResult和JobGraph中JobVertex的IntermediateDataSet的概念对应,用于表示两个相邻的ExecutionJobVertex之间数据传输过程中的临时存放点。IntermediateResult在ExecutionJobVertex创建的时候被构建出来,数量和该vertex的并行度一致。

DefaultExecutionGraphBuilder的buildGraph方法

public static ExecutionGraph buildGraph(
    JobGraph jobGraph,
    Configuration jobManagerConfig,
    ScheduledExecutorService futureExecutor,
    Executor ioExecutor,
    ClassLoader classLoader,
    CompletedCheckpointStore completedCheckpointStore,
    CheckpointsCleaner checkpointsCleaner,
    CheckpointIDCounter checkpointIdCounter,
    Time rpcTimeout,
    MetricGroup metrics,
    BlobWriter blobWriter,
    Logger log,
    ShuffleMaster<?> shuffleMaster,
    JobMasterPartitionTracker partitionTracker,
    TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,
    ExecutionDeploymentListener executionDeploymentListener,
    ExecutionStateUpdateListener executionStateUpdateListener,
    long initializationTimestamp,
    VertexAttemptNumberStore vertexAttemptNumberStore)
    throws JobExecutionException, JobException 

    checkNotNull(jobGraph, "job graph cannot be null");

    // 获取作业名称和作业ID
    final String jobName = jobGraph.getName();
    final JobID jobId = jobGraph.getJobID();

    // 创建JobInformation
    // JobInformation为ExecutionGraph中的job相关配置信息的封装类
    final JobInformation jobInformation =
        new JobInformation(
        jobId,
        jobName,
        jobGraph.getSerializedExecutionConfig(),
        jobGraph.getJobConfiguration(),
        jobGraph.getUserJarBlobKeys(),
        jobGraph.getClasspaths());

    // 获取保留在历史记录中的最大重试次数
    final int maxPriorAttemptsHistoryLength =
        jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);

    // 获取IntermediateResultPartition释放策略工厂类
    final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =
        PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(
        jobManagerConfig);

    // create a new execution graph, if none exists so far
    // 创建ExecutionGraph,后面章节分析
    final DefaultExecutionGraph executionGraph;
    try 
            executionGraph =
                    new DefaultExecutionGraph(
                            jobInformation,
                            futureExecutor,
                            ioExecutor,
                            rpcTimeout,
                            maxPriorAttemptsHistoryLength,
                            classLoader,
                            blobWriter,
                            partitionReleaseStrategyFactory,
                            shuffleMaster,
                            partitionTracker,
                            partitionLocationConstraint,
                            executionDeploymentListener,
                            executionStateUpdateListener,
                            initializationTimestamp,
                            vertexAttemptNumberStore);
     catch (IOException e) 
        throw new JobException("Could not create the ExecutionGraph.", e);
    

    // set the basic properties

    try 
        // 设置json格式的执行计划
        executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
     catch (Throwable t) 
        log.warn("Cannot create JSON plan for job", t);
        // give the graph an empty plan
        // 如果根据jobGraph生成json执行计划失败,设置一个空的执行计划
        executionGraph.setJsonPlan("");
    

    // initialize the vertices that have a master initialization hook
    // file output formats create directories here, input formats create splits

    final long initMasterStart = System.nanoTime();
    log.info("Running initialization on master for job  ().", jobName, jobId);

    for (JobVertex vertex : jobGraph.getVertices()) 
        // 获取节点调用的类名,即节点的task
        String executableClass = vertex.getInvokableClassName();
        // 确保每个节点的调用类必须存在
        if (executableClass == null || executableClass.isEmpty()) 
            throw new JobSubmissionException(
                jobId,
                "The vertex "
                + vertex.getID()
                + " ("
                + vertex.getName()
                + ") has no invokable class.");
        

        try 
            // 根据不同的节点类型,调用job启动时节点的任务逻辑
            vertex.initializeOnMaster(classLoader);
         catch (Throwable t) 
            throw new JobExecutionException(
                jobId,
                "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(),
                t);
        
    

    log.info(
        "Successfully ran initialization on master in  ms.",
        (System.nanoTime() - initMasterStart) / 1_000_000);

    // topologically sort the job vertices and attach the graph to the existing one
    // 按照拓扑结构(数据流的顺序)排序,获取所有的Job顶点
    List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
    if (log.isDebugEnabled()) 
        log.debug(
            "Adding  vertices from job graph  ().",
            sortedTopology.size(),
            jobName,
            jobId);
    
    // executionGraph绑定所有的Job节点
    executionGraph.attachJobGraph(sortedTopology);

    if (log.isDebugEnabled()) 
        log.debug(
            "Successfully created execution graph from job graph  ().", jobName, jobId);
    

    // configure the state checkpointing
    // 配置checkpoint
    // 如果启用了checkpoint
    if (isCheckpointingEnabled(jobGraph))

以上是关于FlinkFlink 源码之ExecutionGraph的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFlink 源码之ExecutionGraph

FlinkFlink 源码之RPC调用

FlinkFlink 源码之Buffer Debloating

FlinkFlink 源码之AsyncFunction异步 IO 源码

FlinkFlink 源码之OperatorChain

FlinkFlink 源码之 安全认证