FlinkFlink 1.12.2 Task的调度 源码

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 1.12.2 Task的调度 源码相关的知识,希望对你有一定的参考价值。

在这里插入图片描述

1.概述

转载:Flink 1.12.2 Task的调度 源码

一 .前言

在生成ExecutionGraph之后, Flink就可以根据ExecutionGraph生成具体的Task , 调度到TaskManager上开始执行.

在这里插入图片描述

二 .概念相关

调度器是 Flink 作业执行的核心组件,管理作业执行的所有相关过程,包括 JobGraph 到ExecutionGraph 的转换、作业生命周期管理(作业的发布、取消、停止)、作业的 Task 生命周期管理(Task 的发布、取消、停止)、资源申请与释放、作业和 Task 的 Failover 等。

调度有几个重要的组件:

  • 调度器: SchedulerNG 及其子类、实现类
  • 调度策略: SchedulingStrategy 及其实现类
  • 调度模式: ScheduleMode 包含流和批的调度,有各自不同的调度模式

2.1. 调度器

调度器作用:
1)作业的生命周期管理,如作业的发布、挂起、取消
2)作业执行资源的申请、分配、释放
3)作业的状态管理,作业发布过程中的状态变化和作业异常时的 FailOver 等
4)作业的信息提供,对外提供作业的详细信息

在这里插入图片描述
实现类: DefaultScheduler
在这里插入图片描述

2.2. 调度策略

SchedulingStrategy是一个接口, 里面定义了四个方法:

名称描述
void startScheduling();调度入口,触发调度器的调度行为
void restartTasks(Set verticesToRestart);重启执行失败的 Task,一般是 Task 执行异常导致
void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState);当 Execution 改变状态时调用
void onPartitionConsumable(IntermediateResultPartitionID resultPartitionId);当 IntermediateResultPartition 中的数据可以消费时调用

SchedulingStrategy有三种实现:

  • EagerSchedulingStrategy: 适用于流计算,同时调度所有的 task
  • LazyFromSourcesSchedulingStrategy:适用于批处理,当输入数据准备好时(上游处理完)进行 vertices 调度。
  • PipelinedRegionSchedulingStrategy:以流水线的局部为粒度进行调度

在这里插入图片描述
PipelinedRegionSchedulingStrategy 是 1.11 加入的,从 1.12 开始,将以 pipelined region为单位进行调度。

pipelined region 是一组流水线连接的任务。这意味着,对于包含多个 region的流作业,在开始部署任务之前,它不再等待所有任务获取 slot。取而代之的是,一旦任何region 获得了足够的任务 slot 就可以部署它。对于批处理作业,将不会为任务分配 slot,也不会单独部署任务。取而代之的是,一旦某个 region 获得了足够的 slot,则该任务将与所有其他任务一起部署在同一区域中

三 .代码浅析

3.1. 代码调用顺序

->    org.apache.flink.runtime.jobmaster#startJobExecution
 ->    org.apache.flink.runtime.jobmaster#resetAndStartScheduler
  ->    org.apache.flink.runtime.jobmaster#startScheduling
   ->    org.apache.flink.runtime.scheduler.DefaultScheduler#startSchedulingInternal
    ->    org.apache.flink.runtime.scheduler.strategy#startScheduling
     ->    org.apache.flink.runtime.scheduler.strategy#maybeScheduleRegions
      ->    org.apache.flink.runtime.scheduler.DefaultScheduler#allocateSlotsAndDeploy
        ->    org.apache.flink.runtime.scheduler.DefaultScheduler#waitForAllSlotsAndDeploy
         ->    org.apache.flink.runtime.scheduler.DefaultScheduler#deployAll
          ->    org.apache.flink.runtime.scheduler.DefaultScheduler#deployOrHandleError
           ->    org.apache.flink.runtime.scheduler.DefaultScheduler#deployTaskSafe
            ->    org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations#deploy
             ->    org.apache.flink.runtime.executiongraph.ExecutionVertex#deploy
               ->    org.apache.flink.runtime.executiongraph#deploy
                ->    org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask
                 ->    org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask


3.2. JobMaster#startJobExecution


    // ----------------------------------------------------------------------------------------------
    // Internal methods
    // ----------------------------------------------------------------------------------------------
    // -- job 启动&停止
    // -- job starting and stopping
    // -----------------------------------------------------------------

    private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {

        validateRunsInMainThread();

        checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");

        if (Objects.equals(getFencingToken(), newJobMasterId)) {
            log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);

            return Acknowledge.get();
        }

        setNewFencingToken(newJobMasterId);

        // 真正启动 JobMaster
        startJobMasterServices();

        // Starting execution of job
        //      Socket Window WordCount (694474d11da6100e82744c9e47e2f511)
        // under job master id
        //      00000000000000000000000000000000.
        log.info(
                "Starting execution of job {} ({}) under job master id {}.",
                jobGraph.getName(),
                jobGraph.getJobID(),
                newJobMasterId);

        // 重置 & 启动 Scheduler  ...
        resetAndStartScheduler();

        return Acknowledge.get();
    }


3.3. JobMaster#resetAndStartScheduler


    // 启动Scheduler ??
    private void resetAndStartScheduler() throws Exception {
        validateRunsInMainThread();

        final CompletableFuture<Void> schedulerAssignedFuture;

        if (schedulerNG.requestJobStatus() == JobStatus.CREATED) {
            schedulerAssignedFuture = CompletableFuture.completedFuture(null);
            schedulerNG.setMainThreadExecutor(getMainThreadExecutor());
        } else {
            suspendAndClearSchedulerFields(
                    new FlinkException(
                            "ExecutionGraph is being reset in order to be rescheduled."));
            final JobManagerJobMetricGroup newJobManagerJobMetricGroup =
                    jobMetricGroupFactory.create(jobGraph);



            final SchedulerNG newScheduler =
                    createScheduler(executionDeploymentTracker, newJobManagerJobMetricGroup);

            schedulerAssignedFuture =
                    schedulerNG
                            .getTerminationFuture()
                            .handle(
                                    (ignored, throwable) -> {
                                        newScheduler.setMainThreadExecutor(getMainThreadExecutor());
                                        assignScheduler(newScheduler, newJobManagerJobMetricGroup);
                                        return null;
                                    });
        }
        // 启动调度 startScheduling
        FutureUtils.assertNoException(schedulerAssignedFuture.thenRun(this::startScheduling));
    }

3.4. JobMaster#startScheduling


    private void startScheduling() {
        checkState(jobStatusListener == null);
        // register self as job status change listener
        jobStatusListener = new JobManagerJobStatusListener();
        schedulerNG.registerJobStatusListener(jobStatusListener);

        // 开始调度 ???
        schedulerNG.startScheduling();
    }


3.5. DefaultScheduler#startSchedulingInternal

    @Override
    protected void startSchedulingInternal() {
        log.info(
                "Starting scheduling with scheduling strategy [{}]",
                schedulingStrategy.getClass().getName());
        prepareExecutionGraphForNgScheduling();

        // 默认调度策略 : PipelinedRegion   SchedulingStrategy
        // PipelinedRegionSchedulingStrategy#startScheduling
        schedulingStrategy.startScheduling();
    }


3.6. strategy#startScheduling

    @Override
    public void startScheduling() {


        final Set<SchedulingPipelinedRegion> sourceRegions =
                IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
                        .filter(region -> !region.getConsumedResults().iterator().hasNext())
                        .collect(Collectors.toSet());
        // 这里...
        maybeScheduleRegions(sourceRegions);
    }

3.7. strategy#maybeScheduleRegions


    private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
        final List<SchedulingPipelinedRegion> regionsSorted =
                SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(
                        schedulingTopology, regions);
        for (SchedulingPipelinedRegion region : regionsSorted) {
            // 继续...
            maybeScheduleRegion(region);
        }
    }

    private void maybeScheduleRegion(final SchedulingPipelinedRegion region) {
        if (!areRegionInputsAllConsumable(region)) {
            return;
        }

        checkState(
                areRegionVerticesAllInCreatedState(region),
                "BUG: trying to schedule a region which is not in CREATED state");

        final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
                SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
                        regionVerticesSorted.get(region), id -> deploymentOption);

        // 这里...
        // DefaultScheduler # allocateSlotsAndDeploy
        schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
    }

3.8. DefaultScheduler#allocateSlotsAndDeploy


    // ------------------------------------------------------------------------
    // SchedulerOperations
    // ------------------------------------------------------------------------

    @Override
    public void allocateSlotsAndDeploy(
            final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
        validateDeploymentOptions(executionVertexDeploymentOptions);


        //    {ExecutionVertexID@7434} "bc764cd8ddf7a0cff126f51c16239658_0" -> {ExecutionVertexDeploymentOption@7484}
        //    {ExecutionVertexID@7453} "ea632d67b7d595e5b851708ae9ad79d6_1" -> {ExecutionVertexDeploymentOption@7485}
        //    {ExecutionVertexID@7445} "0a448493b4782967b150582570326227_2" -> {ExecutionVertexDeploymentOption@7486}
        //    {ExecutionVertexID@7451} "ea632d67b7d595e5b851708ae9ad79d6_0" -> {ExecutionVertexDeploymentOption@7487}
        //    {ExecutionVertexID@7448} "0a448493b4782967b150582570326227_3" -> {ExecutionVertexDeploymentOption@7488}
        //    {ExecutionVertexID@7460} "6d2677a0ecc3fd8df0b72ec675edf8f4_0" -> {ExecutionVertexDeploymentOption@7489}
        //    {ExecutionVertexID@7457} "ea632d67b7d595e5b851708ae9ad79d6_3" -> {ExecutionVertexDeploymentOption@7490}
        //    {ExecutionVertexID@7441} "0a448493b4782967b150582570326227_0" -> {ExecutionVertexDeploymentOption@7491}
        //    {ExecutionVertexID@7455} "ea632d67b7d595e5b851708ae9ad79d6_2" -> {ExecutionVertexDeploymentOption@7492}
        //    {ExecutionVertexID@7443} "0a448493b4782967b150582570326227_1" -> {ExecutionVertexDeploymentOption@7493}
        final Map<ExecutionVertexID, ExecutionVertexDeploymentOption> deploymentOptionsByVertex =
                groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions);

        //    0 = {ExecutionVertexID@7434} "bc764cd8ddf7a0cff126f51c16239658_0"
        //    1 = {ExecutionVertexID@7441} "0a448493b4782967b150582570326227_0"
        //    2 = {ExecutionVertexID@7443} "0a448493b4782967b150582570326227_1"
        //    3 = {ExecutionVertexID@7445} "0a448493b4782967b150582570326227_2"
        //    4 = {ExecutionVertexID@7448} "0a448493b4782967b150582570326227_3"
        //    5 = {ExecutionVertexID@7451} "ea632d67b7d595e5b851708ae9ad79d6_0"
        //    6 = {ExecutionVertexID@7453} "ea632d67b7d595e5b851708ae9ad79d6_1"
        //    7 = {ExecutionVertexID@7455} "ea632d67b7d595e5b851708ae9ad79d6_2"
        //    8 = {ExecutionVertexID@7457} "ea632d67b7d595e5b851708ae9ad79d6_3"
        //    9 = {ExecutionVertexID@7460} "6d2677a0ecc3fd8df0b72ec675edf8f4_0"
        final List<ExecutionVertexID> verticesToDeploy =
                executionVertexDeploymentOptions.stream()
                        .map(ExecutionVertexDeploymentOption::getExecutionVertexId)
                        .collect(Collectors.toList());


        //    {ExecutionVertexID@7434} "bc764cd8ddf7a0cff126f51c16239658_0" -> {ExecutionVertexVersion@7566}
        //          key = {ExecutionVertexID@7434} "bc764cd8ddf7a0cff126f51c16239658_0"
        //          value = {ExecutionVertexVersion@7566}
        //              executionVertexId = {ExecutionVertexID@7434} "bc764cd8ddf7a0cff126f51c16239658_0"
        //              version = 1
        //    {ExecutionVertexID@7453} "ea632d67b7d595e5b851708ae9ad79d6_1" -> {ExecutionVertexVersion@7567}
        //          key = {ExecutionVertexID@7453} "ea632d67b7d595e5b851708ae9ad79d6_1"
        //          value = {ExecutionVertexVersion@7567}
        //                executionVertexId = {ExecutionVertexID@7453} "ea632d67b7d595e5b851708ae9ad79d6_1"
        //                version = 1
        //    {ExecutionVertexID@7445} "0a448493b4782967b150582570326227_2" -> {ExecutionVertexVersion@7568}
        //    {ExecutionVertexID@7451} "ea632d67b7d595e5b851708ae9ad79d6_0" -> {ExecutionVertexVersion@7569}
        //    {ExecutionVertexID@7448} "0a448493b4782967b150582570326227_3" -> {ExecutionVertexVersion@7570}
        //    {ExecutionVertexID@7460} "6d2677a0ecc3fd8df0b72ec675edf8f4_0" -> {ExecutionVertexVersion@7571}
        //    {ExecutionVertexID@7457} "ea632d67b7d595e5b851708ae9ad79d6_3" -> {ExecutionVertexVersion@7572}
        //    {ExecutionVertexID@7441} "0a448493b4782967b150582570326227_0" -> {ExecutionVertexVersion@7573}
        //    {ExecutionVertexID@7455} "ea632d67b7d595e5b851708ae9ad79d6_2" -> {ExecutionVertexVersion@7574}
        //    {ExecutionVertexID@7443} "0a448493b4782967b150582570326227_1" -> {ExecutionVertexVersion@7575}
        final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex =
                executionVertexVersioner.recordVertexModifications(verticesToDeploy);

        transitionToScheduled(verticesToDeploy);

        // allocateSlots ??
        //    slotExecutionVertexAssignments = {ArrayList@8148}  size = 10
        //        0 = {SlotExecutionVertexAssignment@8064}
        //        1 = {SlotExecutionVertexAssignment@8070}
        //        2 = {SlotExecutionVertexAssignment@8072}
        //        3 = {SlotExecutionVertexAssignment@8066}
        //        4 = {SlotExecutionVertexAssignment@8068}
        //        5 = {SlotExecutionVertexAssignment@8067}
        //        6 = {SlotExecutionVertexAssignment@8065}
        //        7 = {SlotExecutionVertexAssignment@8073}
        //        8 = {SlotExecutionVertexAssignment@8071}
        //        9 = {SlotExecutionVertexAssignment@8069}
        final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
                allocateSlots(executionVertexDeploymentOptions);

        // DeploymentHandle包含 : 版本, 参数 , slot分配信息
        //    deploymentHandles = {ArrayList@8194}  size = 10
        //        0 = {DeploymentHandle@8212}
        //            requiredVertexVersion = {ExecutionVertexVersion@7566}
        //            executionVertexDeploymentOption = {ExecutionVertexDeploymentOption@7484}
        //            slotExecutionVertexAssignment = {SlotExecutionVertexAssignment@8064}
        //        1 = {DeploymentHandle@8213}
        //            requiredVertexVersion = {ExecutionVertexVersion@7573}
        //            executionVertexDeploymentOption = {ExecutionVertexDeploymentOption@7491}
        //            slotExecutionVertexAssignment = {SlotExecutionVertexAssignment@8070}
        //        2 = {DeploymentHandle@8214}
        //        3 = {DeploymentHandle@8215}
        //        4 = {DeploymentHandle@8216}
        //        5 = {DeploymentHandle@8217}
        //        6 = {DeploymentHandle@8218}
        //        7 = {DeploymentHandle@8219}
        //        8 = {DeploymentHandle@8220}
        //        9 = {DeploymentHandle@8221}
        final List<DeploymentHandle> deploymentHandles =
                createDeploymentHandles(
                        requiredVersionByVertex,
                        deploymentOptionsByVertex,
                        slotExecutionVertexAssignments);

        // 开始部署 ...
        waitForAllSlotsAndDeploy(deploymentHandles);
    }


3.9. DefaultScheduler#waitForAllSlotsAndDeploy

    private void waitForAllSlotsAndDeploy(final List<DeploymentHandle> deploymentHandles) {
        //  分配资源, 开始部署
        FutureUtils.assertNoException(
                assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles)));
    }

3.10. DefaultScheduler#deployAll


    private BiFunction<Void, Throwable, Void> deployAll(
            final List<DeploymentHandle> deploymentHandles) {


        return (ignored, throwable) -> {
            propagateIfNonNull(throwable);
            for (final DeploymentHandle deploymentHandle : deploymentHandles) {
                final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
                        deploymentHandle.getSlotExecutionVertexAssignment();
                final CompletableFuture<LogicalSlot> slotAssigned =
                        slotExecutionVertexAssignment.getLogicalSlotFuture();
                checkState(slotAssigned.isDone());
                // slot 分配任务 : deployOrHandleError
                FutureUtils.assertNoException(
                        slotAssigned.handle(deployOrHandleError(deploymentHandle)));
            }
            return null;
        };
    }


3.11. DefaultScheduler#deployOrHandleError


    private BiFunction<Object, Throwable, Void> deployOrHandleError(
            final DeploymentHandle deploymentHandle) {
        final ExecutionVertexVersion requiredVertexVersion =
                deploymentHandle.getRequiredVertexVersion();
        final ExecutionVertexID executionVertexId = requiredVertexVersion.getExecutionVertexId();

        return (ignored, throwable) -> {
            if (executionVertexVersioner.isModified(requiredVertexVersion)) {
                log.debug(
                        "Refusing to deploy execution vertex {} because this deployment was "
                                + "superseded by another deployment",
                        executionVertexId);
                return null;
            }

            if (throwable == null) {
                // 部署task
                deployTaskSafe(executionVertexId);
            } else {
                handleTaskDeploymentFailure(executionVertexId, throwable);
            }
            return null;
        };
    }

3.12. DefaultScheduler#deployTaskSafe


    private void deployTaskSafe(final ExecutionVertexID executionVertexId) {
        try {
            // 获取ExecutionVertex
            final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
            // 开始部署 : ExecutionVertex
            // DefaultExecutionVertexOperations#deploy
            executionVertexOperations.deploy(executionVertex);
        } catch (Throwable e) {
            handleTaskDeploymentFailure(executionVertexId, e);
        }
    }

3.13. DefaultExecutionVertexOperations#deploy

    @Override
    public void deploy(final ExecutionVertex executionVertex) throws JobException {
        executionVertex.deploy();
    }


3.14. ExecutionVertex#deploy

    public void deploy() throws JobException {
        // 部署
        currentExecution.deploy();
    }


3.15. ExecutionVertex#deploy


    /**
     *  将 execution 部署到先前分配的资源。
     * Deploys the execution to the previously assigned resource.
     *
     * @throws JobException if the execution cannot be deployed to the assigned resource
     */
    public void deploy() throws JobException {
        assertRunningInJobMasterMainThread();

        // 获取slot

        //    slotRequestId = {SlotRequestId@8931} "SlotRequestId{7d3611a3599a124ed703d75c55561420}"
        //    slotContext = {AllocatedSlot@8932} "AllocatedSlot e5eeb5d0e767c407ea81ab345a14ebd8 @ container_1619273419318_0017_01_000002 @ henghe-030 (dataPort=39722) - 0"
        //    slotSharingGroupId = null
        //    locality = {Locality@8933} "UNKNOWN"
        //    slotOwner = {SharedSlot@8934}
        //    releaseFuture = {CompletableFuture@8935} "java.util.concurrent.CompletableFuture@7ea60a0f[Not completed]"
        //    state = {SingleLogicalSlot$State@8936} "ALIVE"
        //    payload = {Execution@8899} "Attempt #0 (Source: Socket Stream (1/1)) @ org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@7f697d27 - [SCHEDULED]"
        //    willBeOccupiedIndefinitely = true
        final LogicalSlot slot = assignedResource;

        checkNotNull(
                slot,
                "In order 

以上是关于FlinkFlink 1.12.2 Task的调度 源码的主要内容,如果未能解决你的问题,请参考以下文章

flinkFlink 1.12.2 源码浅析 : Task数据输入

flinkFlink 1.12.2 源码浅析 :Task数据输出

flinkFlink 1.12.2 源码浅析 : StreamTask 浅析

FlinkFlink 不支持部分 task 结束后进行 checkpoint

FlinkFLink Task did not exit gracefully within 180 + seconds 未解决

FlinkFlink 自动化检测 Flink 消息处理最慢 Task