flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [四]

上一篇: 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint

整体流程图


TaskManager启动大体流程
YarnTaskExecutorRunner:Yarn模式下的TaskManager的入口类

  1. 启动 TaskExecutor
  2. 向ResourceManager注册slot
  3. ResourceManager分配slot
  4. TaskExecutor接收到分配的指令,提供offset给JobMaster(slotpool)
  5. JobMaster提交任务给TaskExecutor去执行

二 .代码分析

2.1. 执行入口

TaskManager的执行的入口类为 YarnTaskExecutorRunner#mian

   /**
     * The entry point for the YARN task executor runner.
     *
     * @param args The command line arguments.
     */
    public static void main(String[] args) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", args);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);

        runTaskManagerSecurely(args);
    }

2.2. TaskManagerRunner#runTaskManagerProcessSecurely

TaskManagerRunner.runTaskManagerProcessSecurely 是TaskManager的启动方法.
通过 YarnTaskExecutorRunner#mian 进行跳转. 跳转顺序入下:

YarnTaskExecutorRunner#mian
			--> YarnTaskExecutorRunner#runTaskManagerSecurely
				--> TaskManagerRunner.runTaskManagerProcessSecurely(Preconditions.checkNotNull(configuration));


    public static void runTaskManagerProcessSecurely(Configuration configuration) {
        replaceGracefulExitWithHaltIfConfigured(configuration);

        // 加载插件 ???
        final PluginManager pluginManager =
                PluginUtils.createPluginManagerFromRootFolder(configuration);
        // 文件系统初始化
        FileSystem.initialize(configuration, pluginManager);

        int exitCode;
        Throwable throwable = null;

        try {
            // 安全模块
            SecurityUtils.install(new SecurityConfiguration(configuration));

            exitCode =
                    SecurityUtils.getInstalledContext()
                            // 启动TaskManager = > runTaskManager
                            .runSecured(() -> runTaskManager(configuration, pluginManager)  );
        } catch (Throwable t) {
            throwable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            exitCode = FAILURE_EXIT_CODE;
        }

        if (throwable != null) {
            LOG.error("Terminating TaskManagerRunner with exit code {}.", exitCode, throwable);
        } else {
            LOG.info("Terminating TaskManagerRunner with exit code {}.", exitCode);
        }

        System.exit(exitCode);
    }

2.3. TaskManagerRunner#runTaskManager

主要负责构建&启动TaskManagerRunner


    public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
            throws Exception {
        final TaskManagerRunner taskManagerRunner;

        try {
            // 构建 TaskManagerRunner
            taskManagerRunner =
                    new TaskManagerRunner(
                            configuration,
                            pluginManager,
                            TaskManagerRunner::createTaskExecutorService);

            // 启动 TaskManagerRunner
            taskManagerRunner.start();
        } catch (Exception exception) {
            throw new FlinkException("Failed to start the TaskManagerRunner.", exception);
        }

        try {
            return taskManagerRunner.getTerminationFuture().get().getExitCode();
        } catch (Throwable t) {
            throw new FlinkException(
                    "Unexpected failure during runtime of TaskManagerRunner.",
                    ExceptionUtils.stripExecutionException(t));
        }
    }

2.4. 启动 TaskManagerRunner

TaskManagerRunner#runTaskManager 方法中的 taskManagerRunner.start() 会调用TaskExecutor#onStart 方法.

   public void start() throws Exception {
        taskExecutorService.start();
    }

TaskExecutorToServiceAdapter

 @Override
    public void start() {
        taskExecutor.start();
    }

 @Override
    public void onStart() throws Exception {
        try {
            startTaskExecutorServices();
        } catch (Throwable t) {
            final TaskManagerException exception =
                    new TaskManagerException(
                            String.format("Could not start the TaskExecutor %s", getAddress()), t);
            onFatalError(exception);
            throw exception;
        }

        startRegistrationTimeout();
    }

然后再TaskExecutor#onStart方法中会执行TaskExecutor#startTaskExecutorServices方法.
在这里会连接ResourceManager, 注册Slot


    // 启动 TaskExecutor 服务.
    private void startTaskExecutorServices() throws Exception {
        try {
            // start by connecting to the ResourceManager
            // 连接 ResourceManager .   StandaloneLeaderRetrievalService#Start
            resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

            // tell the task slot table who's responsible for the task slot actions
            // 告诉 task slot table 谁负责 task slot 操作
            taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());

            // start the job leader service
            jobLeaderService.start(
                    getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());

            fileCache =
                    new FileCache(
                            taskManagerConfiguration.getTmpDirectories(),
                            blobCacheService.getPermanentBlobService());
        } catch (Exception e) {
            handleStartTaskExecutorServicesException(e);
        }
    }


2.5. 向ResourceManager注册&启动

TaskExecutor#startTaskExecutorServices方法. 中 的 resourceManagerLeaderRetriever.start.
向ResourceManager注册

跳转顺序

resourceManagerLeaderRetriever.start
	--> StandaloneLeaderRetrievalService#Start
		--> TaskExecutor#notifyLeaderAddress
			--> TaskExecutor#notifyOfNewResourceManagerLeader
				--> TaskExecutor#reconnectToResourceManager
					--> TaskExecutor#tryConnectToResourceManager
						--> TaskExecutor#connectToResourceManager
1

TaskExecutor#connectToResourceManager


    private void connectToResourceManager() {
        // 校验相关
        assert (resourceManagerAddress != null);
        assert (establishedResourceManagerConnection == null);
        assert (resourceManagerConnection == null);

        log.info("Connecting to ResourceManager {}.", resourceManagerAddress);

        // 构建 TaskExecutor Registration
        final TaskExecutorRegistration taskExecutorRegistration =
                new TaskExecutorRegistration(
                        getAddress(),
                        getResourceID(),
                        unresolvedTaskManagerLocation.getDataPort(),
                        JMXService.getPort().orElse(-1),
                        hardwareDescription,
                        memoryConfiguration,
                        taskManagerConfiguration.getDefaultSlotResourceProfile(),
                        taskManagerConfiguration.getTotalResourceProfile());

        // 资源管理 Connection
        resourceManagerConnection =
                new TaskExecutorToResourceManagerConnection(
                        log,
                        getRpcService(),
                        taskManagerConfiguration.getRetryingRegistrationConfiguration(),
                        resourceManagerAddress.getAddress(),
                        resourceManagerAddress.getResourceManagerId(),
                        getMainThreadExecutor(),
                        new ResourceManagerRegistrationListener(),
                        taskExecutorRegistration);


        // 建立连接操作
        resourceManagerConnection.start();
    }

RegisteredRpcConnection#start 方法负责建立连接操作.

   // ------------------------------------------------------------------------
    //  Life cycle
    // ------------------------------------------------------------------------

    public void start() {
        checkState(!closed, "The RPC connection is already closed");
        checkState(
                !isConnected() && pendingRegistration == null,
                "The RPC connection is already started");

        // 构建注册... ,  生成 generateRegistration
        final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();


        if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
            // 开始注册...
            // 注册成功之后,调用startRegistration
            newRegistration.startRegistration();
        } else {
            // concurrent start operation
            newRegistration.cancel();
        }
    }

RegisteredRpcConnection#createNewRegistration


    // ------------------------------------------------------------------------
    //  Internal methods
    // ------------------------------------------------------------------------

    private RetryingRegistration<F, G, S> createNewRegistration() {

        // 生成注册 : JobMaster: generateRegistration 方法
        RetryingRegistration<F, G, S> newRegistration = checkNotNull(generateRegistration());

        CompletableFuture<Tuple2<G, S>> future = newRegistration.getFuture();

        future.whenCompleteAsync(
                (Tuple2<G, S> result, Throwable failure) -> {
                    if (failure != null) {
                        if (failure instanceof CancellationException) {
                            // we ignore cancellation exceptions because they originate from
                            // cancelling
                            // the RetryingRegistration
                            log.debug(
                                    "Retrying registration towards {} was cancelled.",
                                    targetAddress);
                        } else {
                            // this future should only ever fail if there is a bug, not if the
                            // registration is declined
                            onRegistrationFailure(failure);
                        }
                    } else {
                        // 注册成功...
                        targetGateway = result.f0;
                        onRegistrationSuccess(result.f1);
                    }
                },
                executor);

        return newRegistration;
    }

TaskExecutorToResourceManagerConnection#generateRegistration
生成注册

    @Override
    protected RetryingRegistration<
                    ResourceManagerId, ResourceManagerGateway, TaskExecutorRegistrationSuccess>
            generateRegistration() {
        // 构建生成 TaskExecutorToResourceManagerConnection
        return new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
                log,
                rpcService,
                getTargetAddress(),
                getTargetLeaderId(),
                retryingRegistrationConfiguration,
                taskExecutorRegistration);
    }

2.6. RetryingRegistration#startRegistration();

注册成功之后,调用startRegistration

  • connect 建立连接…
  • register 向RM 注册
  • startRegistrationLater
   /**
     * This method resolves the target address to a callable gateway and starts the registration
     * after that.
     */
    @SuppressWarnings("unchecked")
    public void startRegistration() {
        if (canceled) {
            // we already got canceled
            return;
        }

        try {
            // trigger resolution of the target address to a callable gateway
            final CompletableFuture<G> rpcGatewayFuture;

            if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
                rpcGatewayFuture =
                        (CompletableFuture<G>)

                                // 建立连接....
                                rpcService.connect(
                                        targetAddress,
                                        fencingToken,
                                        targetType.asSubclass(FencedRpcGateway.class));
            } else {
                rpcGatewayFuture = rpcService.connect(targetAddress, targetType);
            }

            // upon success, start the registration attempts
            CompletableFuture<Void> rpcGatewayAcceptFuture =
                    rpcGatewayFuture.thenAcceptAsync(
                            (G rpcGateway) -> {
                                log.info("Resolved {} address, beginning registration", targetName);

                                // 执行注册操作
                                register(
                                        rpcGateway,
                                        1,
                                        retryingRegistrationConfiguration
                                                .getInitialRegistrationTimeoutMillis());
                            },
                            rpcService.getExecutor());

            // upon failure, retry, unless this is cancelled
            rpcGatewayAcceptFuture.whenCompleteAsync(
                    (Void v, Throwable failure) -> {
                        if (failure != null && !canceled) {
                            final Throwable strippedFailure =
                                    ExceptionUtils.stripCompletionException(failure);
                            if (log.isDebugEnabled()) {
                                log.debug(
                                        "Could not resolve {} address {}, retrying in {} ms.",
                                        targetName,
                                        targetAddress,
                                        retryingRegistrationConfiguration.getErrorDelayMillis(),
                                        strippedFailure);
                            } else {
                                log.info(
                                        "Could not resolve {} address {}, retrying in {} ms: {}",
                                        targetName,
                                        targetAddress,
                                        retryingRegistrationConfiguration.getErrorDelayMillis(),
                                        strippedFailure.getMessage());
                            }

                            // 开展注册
                            startRegistrationLater(
                                    retryingRegistrationConfiguration.getErrorDelayMillis());
                        }
                    },
                    rpcService.getExecutor());
        } catch (Throwable t) {
            completionFuture.completeExceptionally(t);
            cancel();
        }
    }

RetryingRegistration#register 注册的时候会调用 invokeRegistration 方法

		// 开始注册的时候 RetryingRegistration#register 方法
        // 会 调用 invokeRegistration 方法
        @Override
        protected CompletableFuture<RegistrationResponse> invokeRegistration(
                ResourceManagerGateway resourceManager,
                ResourceManagerId fencingToken,
                long timeoutMillis)
                throws Exception {

            Time timeout = Time.milliseconds(timeoutMillis);

            // ResourceManager#registerTaskExecutor
            // 注册 TaskExecutor
            return resourceManager.registerTaskExecutor(taskExecutorRegistration, timeout);
        }

RetryingRegistration#register 中的代码片段… 会调用invokeRegistration 方法

			// [重点]注册的时候会调用 invokeRegistration 方法 .................
            // TaskExecutorToResourceManagerConnection#invokeRegistration
            CompletableFuture<RegistrationResponse> registrationFuture =
                    invokeRegistration(gateway, fencingToken, timeoutMillis)

TaskExecutorToResourceManagerConnection#invokeRegistration
注册 TaskExecutor

 // 开始注册的时候 RetryingRegistration#register 方法
        // 会 调用 invokeRegistration 方法
        @Override
        protected CompletableFuture<RegistrationResponse> invokeRegistration(
                ResourceManagerGateway resourceManager,
                ResourceManagerId fencingToken,
                long timeoutMillis)
                throws Exception {

            Time timeout = Time.milliseconds(timeoutMillis);

            // ResourceManager#registerTaskExecutor
            // 注册 TaskExecutor
            return resourceManager.registerTaskExecutor(taskExecutorRegistration, timeout);
        }
    }

ResourceManager#registerTaskExecutor
向ResouceManager 注册 TaskExecutor

  • 连接taskExecutor
  • 加入缓存
  • 开始注册TaskExecutor == > ResourceManager#registerTaskExecutorInternal

    /**
     * 注册 TaskExecutor
     * Registers a new TaskExecutor.
     *
     * @param taskExecutorRegistration task executor registration parameters
     * @return RegistrationResponse
     */
    private RegistrationResponse registerTaskExecutorInternal(
            TaskExecutorGateway taskExecutorGateway,
            TaskExecutorRegistration taskExecutorRegistration) {


        // 获取 taskExecutorResourceId
        ResourceID taskExecutorResourceId = taskExecutorRegistration.getResourceId();


        // 移除缓存信息
        WorkerRegistration<WorkerType> oldRegistration =  taskExecutors.remove(taskExecutorResourceId);



        if (oldRegistration != null) {
            // 清理就的注册操作....


            // TODO :: suggest old taskExecutor to stop itself
            log.debug(
                    "Replacing old registration of TaskExecutor {}.",
                    taskExecutorResourceId.getStringWithMetadata());

            // remove old task manager registration from slot manager
            slotManager.unregisterTaskManager(
                    oldRegistration.getInstanceIDflinkFlink 1.12.2 源码浅析 : Task 浅析

flinkFlink 1.12.2 源码浅析 : Task数据输入

flinkFlink 1.12.2 源码浅析 :Task数据输出

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint