FlinkFlink 1.12.2 源码浅析 : TaskExecutor

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 1.12.2 源码浅析 : TaskExecutor相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink 1.12.2 源码浅析 : TaskExecutor

TaskExecutor 是TaskManger的具体实现.

二 .TaskExecutorGateway

TaskExecutor 是TaskManager的具体实现, 首先看网关都实现了什么逻辑. 清单如下

2.1. 类图

2.2. 接口清单

名称描述
CompletableFuture requestSlot(
SlotID slotId,
JobID jobId,
AllocationID allocationId,
ResourceProfile resourceProfile,
String targetAddress,
ResourceManagerId resourceManagerId,
@RpcTimeout Time timeout)
从TaskManager请求slot
requestTaskBackPressure获取任务背压相关信息
submitTask( TaskDeploymentDescriptor tdd,
JobMasterId jobMasterId,
@RpcTimeout Time timeout)
[核心]提交任务
updatePartitions更改任务的 分区
releaseOrPromotePartitions批量发布/升级中间结果分区。
releaseClusterPartitions释放属于任何给定数据集的所有群集分区
triggerCheckpoint**触发给定任务的checkpoint。**checkpoint由checkpointID和checkpoint时间戳标识。
confirmCheckpoint确认给定任务的checkpoint。
checkpoint由checkpointID和checkpoint时间戳标识。
abortCheckpoint终止Checkpoint
cancelTask取消任务
heartbeatFromJobManagerJobManager心跳请求
heartbeatFromResourceManagerResourceManager心跳请求
disconnectJobManager断开给定JobManager与TaskManager的连接。
disconnectResourceManager建立给定ResourceManager与TaskManager的连接。
freeSlot释放slot
requestFileUploadByType请求将指定类型的文件上载到集群的{@link BlobServer}。
requestFileUploadByName请求将指定名称的文件上载到集群的{@link BlobServer}。
requestMetricQueryServiceAddress返回TaskManager上度量查询服务的网关。
canBeReleased检查是否可以释放任务执行器。如果有未使用的结果分区,则不能释放它。
requestLogList请求TaskManager上的历史日志文件名。
sendOperatorEventToTask向Task发送Operator Event
requestThreadDump请求TaskManager 的thread dump 信息

三 .代码浅析

3.1. 属性

3.1.1. 服务相关

   // HA
    /** The access to the leader election and retrieval services. */
    private final HighAvailabilityServices haServices;

    // TaskExecutor 相关的服务比如: MemoryManager , IOManager ,ShuffleEnvironment 等等
    private final TaskManagerServices taskExecutorServices;

    /**
     * The task manager configuration.
     * */
    private final TaskManagerConfiguration taskManagerConfiguration;

    /** The fatal error handler to use in case of a fatal error. */
    private final FatalErrorHandler fatalErrorHandler;

    // BLOB缓存提供对永久和临时BLOB的BLOB服务的访问。
    private final BlobCacheService blobCacheService;

    private final LibraryCacheManager libraryCacheManager;

    /** The address to metric query service on this Task Manager. */
    @Nullable private final String metricQueryServiceAddress;


3.1.2. TaskManager相关服务


    /**
     * 此任务管理器的连接信息。
     * The connection information of this task manager. */
    private final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation;

    private final TaskManagerMetricGroup taskManagerMetricGroup;

    /**
     * 此任务的状态管理器,为每个插槽提供状态管理器。
     * The state manager for this task, providing state managers per slot. */
    private final TaskExecutorLocalStateStoresManager localStateStoresManager;

    /** Information provider for external resources. */
    private final ExternalResourceInfoProvider externalResourceInfoProvider;

    /** The network component in the task manager. */
    private final ShuffleEnvironment<?, ?> shuffleEnvironment;

    /** The kvState registration service in the task manager. */
    private final KvStateService kvStateService;

    private final Executor ioExecutor;


3.1.3. 任务slot分配表

    private final TaskSlotTable<Task> taskSlotTable;

    private final JobTable jobTable;

    private final JobLeaderService jobLeaderService;

    private final LeaderRetrievalService resourceManagerLeaderRetriever;


3.1.4. resource manager 相关

 // resource manager 相关
    @Nullable private ResourceManagerAddress resourceManagerAddress;

    @Nullable private EstablishedResourceManagerConnection establishedResourceManagerConnection;

    @Nullable private TaskExecutorToResourceManagerConnection resourceManagerConnection;

    @Nullable private UUID currentRegistrationTimeoutId;

    private Map<JobID, Collection<CompletableFuture<ExecutionState>>>
            taskResultPartitionCleanupFuturesPerJob = new HashMap<>(8);


3.1.5. 其他


    // 硬件描述信息
    private final HardwareDescription hardwareDescription;

    // 内存配置信息
    private final TaskExecutorMemoryConfiguration memoryConfiguration;

    // 文件缓存
    private FileCache fileCache;

    // jobManager 心跳相关
    /** The heartbeat manager for job manager in the task manager. */
    private final HeartbeatManager<AllocatedSlotReport, TaskExecutorToJobManagerHeartbeatPayload>
            jobManagerHeartbeatManager;

    // resource manager 心跳相关
    /** The heartbeat manager for resource manager in the task manager. */
    private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload>
            resourceManagerHeartbeatManager;

    // 分区相关
    private final TaskExecutorPartitionTracker partitionTracker;

    // 背压相关
    private final BackPressureSampleService backPressureSampleService;


3.2. 核心方法

3.2.1. requestSlot

ResourceManager中的SlotManager的调用requestSlot接口向TaskExecutor请求slot .

org.apache.flink.runtime.taskexecutor.TaskExecutor#requestSlot

 @Override
    public CompletableFuture<Acknowledge> requestSlot(
            final SlotID slotId,
            final JobID jobId,
            final AllocationID allocationId,
            final ResourceProfile resourceProfile,
            final String targetAddress,
            final ResourceManagerId resourceManagerId,
            final Time timeout) {
        // TODO: Filter invalid requests from the resource manager by using the
        // instance/registration Id



        // 输出日志信息
        // Receive slot request
        //      3755cb8f9962a9a7738db04f2a02084c
        // for job
        //      694474d11da6100e82744c9e47e2f511
        // from resource manager with leader id
        //      00000000000000000000000000000000.

        log.info(
                "Receive slot request {} for job {} from resource manager with leader id {}.",
                allocationId,
                jobId,
                resourceManagerId);


        // 是否连接到 ResourceManager
        if (!isConnectedToResourceManager(resourceManagerId)) {
            final String message =
                    String.format(
                            "TaskManager is not connected to the resource manager %s.",
                            resourceManagerId);
            log.debug(message);
            return FutureUtils.completedExceptionally(new TaskManagerException(message));
        }

        try {

            //[重点] 分配 slot
            allocateSlot(slotId, jobId, allocationId, resourceProfile);
        } catch (SlotAllocationException sae) {
            return FutureUtils.completedExceptionally(sae);
        }

        final JobTable.Job job;

        try {

            // 获取/构建  JobTable.Job


            job =jobTable.getOrCreateJob(jobId, () -> registerNewJobAndCreateServices(jobId, targetAddress));



        } catch (Exception e) {
            // free the allocated slot
            try {
                taskSlotTable.freeSlot(allocationId);
            } catch (SlotNotFoundException slotNotFoundException) {
                // slot no longer existent, this should actually never happen, because we've
                // just allocated the slot. So let's fail hard in this case!
                onFatalError(slotNotFoundException);
            }

            // release local state under the allocation id.
            localStateStoresManager.releaseLocalStateForAllocationId(allocationId);

            // sanity check
            if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
                onFatalError(new Exception("Could not free slot " + slotId));
            }

            return FutureUtils.completedExceptionally(
                    new SlotAllocationException("Could not create new job.", e));
        }

        if (job.isConnected()) {

            //[重要]  向JobManager提供Slot
            offerSlotsToJobManager(jobId);
        }

        return CompletableFuture.completedFuture(Acknowledge.get());
    }

org.apache.flink.runtime.taskexecutor.TaskExecutor#allocateSlot(slotId, jobId, allocationId, resourceProfile);


    private void allocateSlot(
            SlotID slotId, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile)
            throws SlotAllocationException {

        //    slotId = {SlotID@6055} "container_1619273419318_0032_01_000002_0"
        //        resourceId = {ResourceID@6114} "container_1619273419318_0032_01_000002"
        //        slotNumber = 0
        //    jobId = {JobID@6056} "05fdf1bc744b274be1525c918c1ad378"
        //    allocationId = {AllocationID@6057} "a9ce7abc6f1d6f264dbdce5564efcb76"
        //    resourceProfile = {ResourceProfile@6058} "ResourceProfile{UNKNOWN}"
        //        cpuCores = null
        //        taskHeapMemory = null
        //        taskOffHeapMemory = null
        //        managedMemory = null
        //        networkMemory = null
        //    extendedResources = {HashMap@6116}  size = 0



        //    taskSlotTable = {TaskSlotTableImpl@6077}
        //    numberSlots = 4
        //        defaultSlotResourceProfile = {ResourceProfile@6124} "ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=96.000mb (100663293 bytes), taskOffHeapMemory=0 bytes, managedMemory=128.000mb (134217730 bytes), networkMemory=32.000mb (33554432 bytes)}"
        //        cpuCores = {CPUResource@6139} "Resource(CPU: 1.0000000000000000)"
        //        taskHeapMemory = {MemorySize@6140} "100663293 bytes"
        //        taskOffHeapMemory = {MemorySize@6141} "0 bytes"
        //        managedMemory = {MemorySize@6142} "134217730 bytes"
        //        networkMemory = {MemorySize@6143} "32 mb"
        //        extendedResources = {HashMap@6144}  size = 0
        //    memoryPageSize = 32768
        //    timerService = {TimerService@6125}
        //    taskSlots = {HashMap@6126}  size = 0
        //    allocatedSlots = {HashMap@6127}  size = 0
        //    taskSlotMappings = {HashMap@6128}  size = 0
        //    slotsPerJob = {HashMap@6129}  size = 0
        //    slotActions = {TaskExecutor$SlotActionsImpl@6130}
        //    state = {TaskSlotTableImpl$State@6131} "RUNNING"
        //    budgetManager = {ResourceBudgetManager@6132}
        //    closingFuture = {CompletableFuture@6133} "java.util.concurrent.CompletableFuture@9a6e076[Not completed]"
        //    mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@6096}
        //    memoryVerificationExecutor = {ThreadPoolExecutor@6076} "java.util.concurrent.ThreadPoolExecutor@da5c1a9[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]"
        //    if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {


        if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {


            // 进行分配操作..
            // TaskSlotTableImpl # allocateSlot
            if (taskSlotTable.allocateSlot(
                    slotId.getSlotNumber(),
                    jobId,
                    allocationId,
                    resourceProfile,
                    taskManagerConfiguration.getTimeout())) {


                // Allocated slot for 3755cb8f9962a9a7738db04f2a02084c.
                log.info("Allocated slot for {}.", allocationId);
            } else {
                log.info("Could not allocate slot for {}.", allocationId);
                throw new SlotAllocationException("Could not allocate slot.");
            }
        } else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
            final String message =
                    "The slot " + slotId + " has already been allocated for a different job.";

            log.info(message);

            final AllocationID allocationID =
                    taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
            throw new SlotOccupiedException(
                    message, allocationID, taskSlotTable.getOwningJob(allocationID));
        }
    }

org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager(jobId);


    // ------------------------------------------------------------------------
    //  Internal job manager connection methods
    // ------------------------------------------------------------------------

    private void offerSlotsToJobManager(final JobID jobId) {
        // 向JobManager提供Slot : internalOfferSlotsToJobManager
        jobTable.getConnection(jobId).ifPresent(this::internalOfferSlotsToJobManager);
    }


    private void internalOfferSlotsToJobManager(JobTable.Connection jobManagerConnection) {
        // 获取JobID
        final JobID jobId = jobManagerConnection.getJobId();

        // JobID是否已经分配
        if (taskSlotTable.hasAllocatedSlots(jobId)) {

            // Offer reserved slots to the leader of job 694474d11da6100e82744c9e47e2f511.
            log.info("Offer reserved slots to the leader of job {}.", jobId);

            // 获取JobMaster 的  Gateway
            final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();

            // 获取 分配给jobId 的所有 TaskSlot
            final Iterator<TaskSlot<Task>> reservedSlotsIterator =  taskSlotTable.getAllocatedSlots(jobId);

            // 获取 JobMasterId
            final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();

            // 保留的Slot
            final Collection<SlotOffer> reservedSlots = new HashSet<>(2);

            while (reservedSlotsIterator.hasNext()) {
                SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
                reservedSlots.add(offer);
            }

            // offerSlots
            // Offers the given slots to the job manager.
            // The response contains the set of accepted slots.

            // JobMaster#offerSlots
            CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture =
                    jobMasterGateway.offerSlots(
                            getResourceID(), reservedSlots, taskManagerConfiguration.getTimeout());


            // 异步操作.  处理响应请求,处理异常 || 标记为 slot 状态为active
            acceptedSlotsFuture.whenCompleteAsync(
                    handleAcceptedSlotOffers(jobId, jobMasterGateway, jobMasterId, reservedSlots),
                    getMainThreadExecutor());
        } else {
            log.debug("There are no unassigned slots for the job {}.", jobId);
        }
    }

3.2.2. freeSlot

Frees the slot with the given allocation ID.

    @Override
    public CompletableFuture<Acknowledge> freeSlot(
            AllocationID allocationId, Throwable cause, Time timeout) {
        freeSlotInternal(allocationId, cause);

        return CompletableFuture.completedFuture(Acknowledge.get());
    }
	
	
    private void freeSlotInternal(AllocationID allocationId, Throwable cause) {
        checkNotNull(allocationId);

        log.debug("Free slot with allocation id {} because: {}", allocationId, cause.getMessage());

        try {
            final JobID jobId = taskSlotTable.getOwningJob(allocationId);

            // 获取slot 索引的下标.
            final int slotIndex = taskSlotTable.freeSlot(allocationId, cause);

            if (slotIndex != -1) {

                if (isConnectedToResourceManager()) {
                    // 获取ResourceManager
                    // the slot was freed. Tell the RM about it
                    ResourceManagerGateway resourceManagerGateway =
                            establishedResourceManagerConnection.getResourceManagerGateway();
                    // 通知RM slot释放.
                    resourceManagerGateway.notifySlotAvailable(
                            establishedResourceManagerConnection.getTaskExecutorRegistrationId(),
                            new SlotID(getResourceID(), slotIndex),
                            allocationId);
                }

                if (jobId != null) {
                    closeJobManagerConnectionIfNoAllocatedResources(jobId);
                }
            }
        } catch (SlotNotFoundException e) {
            log.debug("Could not free slot for allocation id {}.", allocationId, e);
        }

        // 本地存储清空
        localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
    }


3.2.3. submitTask

核心的就是构造一个Task, 然后交由线程Thread执行.


    // ----------------------------------------------------------------------
    // Task lifecycle RPCs
    // 提交 任务 !!!
    // ----------------------------------------------------------------------

    @Override
    public CompletableFuture<Acknowledge> submitTask(
            TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {

        try {
            final JobID jobId = tdd.getJobId();
            final ExecutionAttemptID executionAttemptID = tdd.getExecutionAttemptId();

            final JobTable.Connection jobManagerConnection =
                    jobTable.getConnection(jobId)
                            .orElseThrow(
                                    () -> {
                                        final String message =
                                                "Could not submit task because there is no JobManager "
                                                        + "associated for the job "
                                                        + jobId
                                                        + '.';

                                        log.debug(message);
                                        return new TaskSubmissionException(message);
                                    });

            if (!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) {
                final String message =
                        "Rejecting the task submission because the job manager leader id "
                                + jobMasterId
                                + " does not match the expected job manager leader id "
                                + jobManagerConnection.getJobMasterId()
                                + '.';

                log.debug(message);
                throw new TaskSubmissionException(message);
            }

            if (!taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) {
                final String message =
                        "No task slot allocated for job ID "
                                + jobId
                                + " and allocation ID "
                                + tdd.getAllocationId()
                                + '.';
                log.debug(message);
                throw new TaskSubmissionException(message);
            }

            // re-integrate offloaded data:
            try {
                tdd.loadBigData(blobCacheService.getPermanentBlobService());
            } catch (IOException | ClassNotFoundException e) {

以上是关于FlinkFlink 1.12.2 源码浅析 : TaskExecutor的主要内容,如果未能解决你的问题,请参考以下文章

flinkFlink 1.12.2 源码浅析 : Task 浅析

flinkFlink 1.12.2 源码浅析 : Task数据输入

flinkFlink 1.12.2 源码浅析 :Task数据输出

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint