FlinkFlink 1.12.2 TaskSlotTable

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 1.12.2 TaskSlotTable相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink 1.12.2 源码浅析 : TaskSlotTable

多个{@link TaskSlot}实例的容器。
此外,它还维护多个索引,以便更快地访问任务和分配的slots。
当slots无法分配给job manager时会自动超时…
在使用 TaskSlotTable 之前,必须通过 {@link #start} 方法启动它。

二 .TaskSlotTable 接口

名称描述
start根据给定的 slot actions 启动 task slot table
getAllocationIdsPerJob返回job所有的AllocationID
getActiveTaskSlotAllocationIds返回TaskSlotTable中 所有active task的 AllocationID
getActiveTaskSlotAllocationIdsPerJob返回 jobId 中 acitve TaskSlot 中所有的 AllocationID
createSlotReport返回 SlotReport
allocateSlot为给定job 和allocation id 分配具有给定索引的slot。
如果给定负指数,则生成一个新的自增指数。
如果可以分配slot,则返回true。否则返回false。
markSlotActive将给定分配id下的slot标记为活动。
markSlotInactive将给定分配id下的slot标记为非活动。
freeSlot试着释放这个slot。
如果slot为空,它将任务slot的状态设置为free并返回其索引
如果slot不是空的,那么它会将任务slot的状态设置为releasing,fail all tasks并返回-1。
isValidTimeout根据allocation id. 检查ticket 是否超时
isAllocated根据给定的 index , job和 allocation id .检车slot状态是否为 ALLOCATED
tryMarkSlotActive根据JobID 和 AllocationID 标记匹配的slot状态为ACTIVE(如果该slot的状态为allocated)
isSlotFree检测 slot状态是否为free
hasAllocatedSlots检查作业是否已分配(非活动)slot。
getAllocatedSlots根据job id 返回 所有的TaskSlot
getOwningJob根据AllocationID 返回所属的 JobID
addTask将给定任务添加到由任务的 allocation id 标识的slot中。
removeTask从task slot中删除具有给定执行attempt id 的任务。
如果拥有的task slot处于释放状态并且在删除任务后为空,则通过slot actions 释放slot。
getTask根据ExecutionAttemptID / JobID 获取task
getCurrentAllocation根据index获取改slot所分配的AllocationID
getTaskMemoryManager根据AllocationID 获取 MemoryManager

三 .TaskSlotTableImpl

TaskSlotTableImpl 是 TaskSlotTable 接口的实现类. 负责TaskExecutor端的slot管理.

# 构造调用顺序:
org.apache.flink.yarn.YarnTaskExecutorRunner#main
	org.apache.flink.runtime.taskexecutor.TaskManagerRunner#runTaskManagerProcessSecurely
		org.apache.flink.runtime.taskexecutor.TaskManagerRunner#runTaskManager
			org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager  (返回TaskExecutor对象实例)
				org.apache.flink.runtime.taskexecutor.TaskManagerServices#fromConfiguration
						

3.1. 属性相关

3.1.1. slot 相关

    /**
     * 静态slot分配中的slot数。
     * 如果请求带索引的slot,则所请求的索引必须在[0,numberSlots)范围内。
     * 生成slot report时,即使该slot不存在,我们也应始终生成索引为[0,numberSlots)的广告位。
     * 
     * Number of slots in static slot allocation.
     * If slot is requested with an index, the requested index must within the range of [0, numberSlots).
     *
     * When generating slot report, we should always generate slots with index in [0, numberSlots) even the slot does not exist.
     */
    private final int numberSlots;




    /**
     * 缓存  index -> TaskSlot
     * The list of all task slots. */
    private final Map<Integer, TaskSlot<T>> taskSlots;

    /**
     * 缓存 AllocationID -> TaskSlot
     * Mapping from allocation id to task slot.
     * */
    private final Map<AllocationID, TaskSlot<T>> allocatedSlots;

    /**
     * ExecutionAttemptID -> TaskSlotMapping
     *  Mapping from execution attempt id to task and task slot.
     *  */
    private final Map<ExecutionAttemptID, TaskSlotMapping<T>> taskSlotMappings;

    /**
     *
     *
     * Mapping from job id to allocated slots for a job.
     * */
    private final Map<JobID, Set<AllocationID>> slotsPerJob;

    /** Interface for slot actions, such as freeing them or timing them out. */
    @Nullable private SlotActions slotActions;

    /**
     * 状态相关 :   CREATED,
     *            RUNNING,
     *            CLOSING,
     *            CLOSED
     * The table state. */
    private volatile State state;


3.1.2. 其他属性

  /**
     * 用于静态slot分配的slot资源配置文件。
     * Slot resource profile for static slot allocation.
     * */
    private final ResourceProfile defaultSlotResourceProfile;

    /** Page size for memory manager. */
    private final int memoryPageSize;

    /** Timer service used to time out allocated slots. */
    private final TimerService<AllocationID> timerService;


    private final ResourceBudgetManager budgetManager;

    /** The closing future is completed when all slot are freed and state is closed. */
    private final CompletableFuture<Void> closingFuture;

    /** {@link ComponentMainThreadExecutor} to schedule internal calls to the main thread. */
    private ComponentMainThreadExecutor mainThreadExecutor =
            new DummyComponentMainThreadExecutor(
                    "TaskSlotTableImpl is not initialized with proper main thread executor, "
                            + "call to TaskSlotTableImpl#start is required");

    /** {@link Executor} for background actions, e.g. verify all managed memory released. */
    private final Executor memoryVerificationExecutor;

3.1.3. 构造方法

   public TaskSlotTableImpl(
            final int numberSlots,
            final ResourceProfile totalAvailableResourceProfile,
            final ResourceProfile defaultSlotResourceProfile,
            final int memoryPageSize,
            final TimerService<AllocationID> timerService,
            final Executor memoryVerificationExecutor) {
        Preconditions.checkArgument(
                0 < numberSlots, "The number of task slots must be greater than 0.");

        this.numberSlots = numberSlots;
        this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile);
        this.memoryPageSize = memoryPageSize;

        this.taskSlots = new HashMap<>(numberSlots);

        this.timerService = Preconditions.checkNotNull(timerService);

        budgetManager =
                new ResourceBudgetManager(
                        Preconditions.checkNotNull(totalAvailableResourceProfile));

        allocatedSlots = new HashMap<>(numberSlots);

        taskSlotMappings = new HashMap<>(4 * numberSlots);

        slotsPerJob = new HashMap<>(4);

        slotActions = null;
        state = State.CREATED;
        closingFuture = new CompletableFuture<>();

        this.memoryVerificationExecutor = memoryVerificationExecutor;
    }

3.2. 方法相关

3.2.1. start

    @Override
    public void start(
            SlotActions initialSlotActions, ComponentMainThreadExecutor mainThreadExecutor) {
        Preconditions.checkState(
                state == State.CREATED,
                "The %s has to be just created before starting",
                TaskSlotTableImpl.class.getSimpleName());
        this.slotActions = Preconditions.checkNotNull(initialSlotActions);
        this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor);

        timerService.start(this);
		// 修改状态为 RUNNING
        state = State.RUNNING;
    }

3.2.2. closeAsync

    @Override
    public CompletableFuture<Void> closeAsync() {
        if (state == State.CREATED) {
            state = State.CLOSED;
            closingFuture.complete(null);
        } else if (state == State.RUNNING) {
            state = State.CLOSING;
            final FlinkException cause = new FlinkException("Closing task slot table");
            CompletableFuture<Void> cleanupFuture =
                    FutureUtils.waitForAll(
                                    // 释放slot
                                    new ArrayList<>(allocatedSlots.values())
                                            .stream()
                                                    .map(slot -> freeSlotInternal(slot, cause))
                                                    .collect(Collectors.toList()))
                            .thenRunAsync(
                                    () -> {
                                        state = State.CLOSED;
                                        timerService.stop();
                                    },
                                    mainThreadExecutor);
            FutureUtils.forward(cleanupFuture, closingFuture);
        }
        return closingFuture;
    }

3.2.3. createSlotReport

   @Override
    public SlotReport createSlotReport(ResourceID resourceId) {
        List<SlotStatus> slotStatuses = new ArrayList<>();

        // 循环每一个slot
        for (int i = 0; i < numberSlots; i++) {
            // 构建SlotID
            SlotID slotId = new SlotID(resourceId, i);
            SlotStatus slotStatus;
            if (taskSlots.containsKey(i)) {
                //该slot已经分配
                TaskSlot<T> taskSlot = taskSlots.get(i);
                // 构建 SlotStatus
                slotStatus =
                        new SlotStatus(
                                slotId,
                                taskSlot.getResourceProfile(),
                                taskSlot.getJobId(),
                                taskSlot.getAllocationId());
            } else {
                //该slot尚未分配
                slotStatus = new SlotStatus(slotId, defaultSlotResourceProfile, null, null);
            }

            slotStatuses.add(slotStatus);
        }

        // 循环所有的 allocatedSlots 处理 异常的slot ??? 
        for (TaskSlot<T> taskSlot : allocatedSlots.values()) {
            // 处理 异常的slot ??? 
            if (taskSlot.getIndex() < 0) {
                SlotID slotID = SlotID.generateDynamicSlotID(resourceId);
                SlotStatus slotStatus =
                        new SlotStatus(
                                slotID,
                                taskSlot.getResourceProfile(),
                                taskSlot.getJobId(),
                                taskSlot.getAllocationId());
                slotStatuses.add(slotStatus);
            }
        }

        // 构建SlotReport
        final SlotReport slotReport = new SlotReport(slotStatuses);

        return slotReport;
    }

3.2.4. allocateSlot


    @Override
    public boolean allocateSlot(
            int index,
            JobID jobId,
            AllocationID allocationId,
            ResourceProfile resourceProfile,
            Time slotTimeout) {
        checkRunning();

        Preconditions.checkArgument(index < numberSlots);

        // 获取TaskSlot
        TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);

        if (taskSlot != null) {
            LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot);
            return false;
        }

        // 如果taskSlots 已经包含index
        if (taskSlots.containsKey(index)) {

            TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index);
            LOG.info(
                    "Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.",
                    index,
                    duplicatedTaskSlot.getResourceProfile(),
                    duplicatedTaskSlot.getJobId(),
                    duplicatedTaskSlot.getAllocationId());

            return duplicatedTaskSlot.getJobId().equals(jobId)
                    && duplicatedTaskSlot.getAllocationId().equals(allocationId);
        } else if (allocatedSlots.containsKey(allocationId)) {
            return true;
        }

        // 获取 resourceProfile

        //    resourceProfile = {ResourceProfile@6124} "ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=96.000mb (100663293 bytes), taskOffHeapMemory=0 bytes, managedMemory=128.000mb (134217730 bytes), networkMemory=32.000mb (33554432 bytes)}"
        //        cpuCores = {CPUResource@6139} "Resource(CPU: 1.0000000000000000)"
        //        taskHeapMemory = {MemorySize@6140} "100663293 bytes"
        //        taskOffHeapMemory = {MemorySize@6141} "0 bytes"
        //        managedMemory = {MemorySize@6142} "134217730 bytes"
        //        networkMemory = {MemorySize@6143} "32 mb"
        //        extendedResources = {HashMap@6144}  size = 0
        resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile;


        // 存储resourceProfile
        if (!budgetManager.reserve(resourceProfile)) {
            LOG.info(
                    "Cannot allocate the requested resources. Trying to allocate {}, "
                            + "while the currently remaining available resources are {}, total is {}.",
                    resourceProfile,
                    budgetManager.getAvailableBudget(),
                    budgetManager.getTotalBudget());
            return false;
        }

        // 构建 taskSlot

        //  taskSlot = {TaskSlot@6191} "TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=96.000mb (100663293 bytes), taskOffHeapMemory=0 bytes, managedMemory=128.000mb (134217730 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationId: a9ce7abc6f1d6f264dbdce5564efcb76, jobId: 05fdf1bc744b274be1525c918c1ad378)"
        //    index = 0
        //    resourceProfile = {ResourceProfile@6124} "ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=96.000mb (100663293 bytes), taskOffHeapMemory=0 bytes, managedMemory=128.000mb (134217730 bytes), networkMemory=32.000mb (33554432 bytes)}"
        //    tasks = {HashMap@6197}  size = 0
        //    memoryManager = {MemoryManager@6198}
        //    state = {TaskSlotState@6199} "ALLOCATED"
        //    jobId = {JobID@6056} "05fdf1bc744b274be1525c918c1ad378"
        //    allocationId = {AllocationID@6057} "a9ce7abc6f1d6f264dbdce5564efcb76"
        //    closingFuture = {CompletableFuture@6200} "java.util.concurrent.CompletableFuture@670d0482[Not completed]"
        //    asyncExecutor = {ThreadPoolExecutor@6076} "java.util.concurrent.ThreadPoolExecutor@da5c1a9[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]"
        taskSlot =
                new TaskSlot<>(
                        index,
                        resourceProfile,
                        memoryPageSize,
                        jobId,
                        allocationId,
                        memoryVerificationExecutor);


        if (index >= 0) {
            // 加入缓存...
            taskSlots.put(index, taskSlot);
        }

        // 更新 allocatedSlots
        // update the allocation id to task slot map
        allocatedSlots.put(allocationId, taskSlot);

        // 注册超时时间
        // register a timeout for this slot since it's in state allocated
        timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());

        //  更新 slotsPerJob 的slot 集合
        // add this slot to the set of job slots
        Set<AllocationID> slots = slotsPerJob.get(jobId);

        if (slots == null) {
            slots = new HashSet<>(4);
            slotsPerJob.put(jobId, slots);
        }

        slots.add(allocationId);

        return true;
    }

3.2.5. freeSlot -> freeSlotInternal

清理掉各种缓存然后调用task的shutdown方法关闭任务.


    private CompletableFuture<Void> freeSlotInternal(TaskSlot<T> taskSlot, Throwable cause) {
        AllocationID allocationId = taskSlot.getAllocationId();

        if (LOG.isDebugEnabled()) {
            LOG.debug("Free slot {}.", taskSlot, cause);
        } else {
            LOG.info("Free slot {}.", taskSlot);
        }

        if (taskSlot.isEmpty()) {
            // remove the allocation id to task slot mapping
            allocatedSlots.remove(allocationId);

            // unregister a potential timeout
            timerService.unregisterTimeout(allocationId);

            JobID jobId = taskSlot.getJobId();
            Set<AllocationID> slots = slotsPerJob.get(jobId);

            if (slots == null) {
                throw new IllegalStateException(
                        "There are no more slots allocated for the job "
                                + jobId
                                + ". This indicates a programming bug.");
            }

            slots.remove(allocationId);

            if (slots.isEmpty()) {
                slotsPerJob.remove(jobId);
            }

            taskSlots.remove(taskSlot.getIndex());
            budgetManager.release(taskSlot.getResourceProfile());
        }
        return taskSlot.closeAsync(cause);
    }

3.2.6. 任务相关(add/remove/getTask)


    @Override
    public boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException {
        checkRunning();
        Preconditions.checkNotNull(task);

        TaskSlot<T> taskSlot = getTaskSlot(task.getAllocationId());

        if (taskSlot != null) {
            if (taskSlot.isActive(task.getJobID(), task.getAllocationId())) {
                  // 根据任务的  task.getExecutionId(

以上是关于FlinkFlink 1.12.2 TaskSlotTable的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFlink 1.12.2 TaskSlot

FlinkFlink 1.12.2 TaskSlotTable

FlinkFlink 1.12.2 SlotManager

flinkFlink 1.12.2 源码浅析 : StreamTask 浅析

FlinkFlink 1.12.2 Task的调度 源码

flinkFlink 1.12.2 源码浅析 : Task 浅析