FlinkFlink 1.12.2 SlotManager

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 1.12.2 SlotManager相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink 1.12.2 源码浅析 :SlotManager

SlotManager 位于ResourceManager中.

SlotManager 负责维护所有已注册的任务管理器slot、它们的分配和所有挂起的slot请求的视图。

无论何时注册新slot或释放分配的slot,它都会尝试执行另一个挂起的slot请求。 每当没有足够的可用slot时,slot管理器将通过{@link ResourceActions#allocateResource(WorkerResourceSpec)}通知资源管理器。

为了释放资源并避免资源泄漏,空闲任务管理器(其slot当前未使用的任务管理器)和挂起的slot请求分别超时,从而触发其释放和失败。

二 .SlotManager

2.1. 介绍

SlotManager 是一个接口类,定义了ResourceManager中管理Slot的相关操作.

2.2. 接口清单

名称描述
start启动
suspend挂起
int getNumberRegisteredSlots()获取注册slot数量
int getNumberRegisteredSlotsOf(InstanceID instanceId)根据InstanceID获取slot数量
int getNumberFreeSlots()获取空闲slot数量
int getNumberFreeSlotsOf(InstanceID instanceId)根据InstanceID获取空闲slot数量
Map<WorkerResourceSpec, Integer> getRequiredResources();获取从{@link ResourceActions}请求的尚未完成的workers SlotManager的数量。
ResourceProfile getRegisteredResource();获取注册的Resource
ResourceProfile getRegisteredResourceOf(InstanceID instanceID);根据InstanceId 获取注册的Resource
ResourceProfile getFreeResource();获取空闲Resource
ResourceProfile getFreeResourceOf(InstanceID instanceID);根据InstanceID获取空闲的Resource
int getNumberPendingSlotRequests();挂起的SlotRequests数量
void processResourceRequirements(ResourceRequirements resourceRequirements);通知slot manager关于 job需要的资源信息
boolean registerSlotRequest(SlotRequest slotRequest)发送slot请求.
boolean unregisterSlotRequest(AllocationID allocationId)取消发送slot请求.
registerTaskManager注册TaskManager
unregisterTaskManager取消注册TaskManager
boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport);报告 SlotStatus
void freeSlot(SlotID slotId, AllocationID allocationId);释放Slot
void setFailUnfulfillableRequest(boolean failUnfulfillableRequest);设置失败未实现的请求.

三 .SlotManagerImpl

SlotManagerImpl是SlotManager接口的实现类.

3.1. 属性清单


    
    /** 
     * 所有当前可用slot的索引。
     * Map for all registered slots. */
    private final HashMap<SlotID, TaskManagerSlot> slots;

    /** Index of all currently free slots. */
    private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;

    /**
     * 所有当前注册的任务管理器。
     * All currently registered task managers. */
    private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;

    /**
     * 用于请求重复数据消除的已完成和活动分配的映射。
     * Map of fulfilled and active allocations for request deduplication purposes.
     * */
    private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;

    /**
     * 挂起/未完成的slot分配请求的映射
     * Map of pending/unfulfilled slot allocation requests. */
    private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;

    
    
    /** Scheduled executor for timeouts. */
    private final ScheduledExecutor scheduledExecutor;

    /** Timeout for slot requests to the task manager. */
    private final Time taskManagerRequestTimeout;

    /** Timeout after which an allocation is discarded. */
    private final Time slotRequestTimeout;

    /** Timeout after which an unused TaskManager is released. */
    private final Time taskManagerTimeout;

    private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots;

    private final SlotMatchingStrategy slotMatchingStrategy;

    /** ResourceManager's id. */
    private ResourceManagerId resourceManagerId;

    /** Executor for future callbacks which have to be "synchronized". */
    private Executor mainThreadExecutor;

    /** Callbacks for resource (de-)allocations. */
    private ResourceActions resourceActions;

    private ScheduledFuture<?> taskManagerTimeoutsAndRedundancyCheck;

    private ScheduledFuture<?> slotRequestTimeoutCheck;

    /** True iff the component has been started. */
    private boolean started;

    /**
     * Release task executor only when each produced result partition is either consumed or failed.
     */
    private final boolean waitResultConsumedBeforeRelease;

    /** Defines the max limitation of the total number of slots. */
    private final int maxSlotNum;

    /** Defines the number of redundant taskmanagers. */
    private final int redundantTaskManagerNum;

    /**
     * If true, fail unfulfillable slot requests immediately. Otherwise, allow unfulfillable request
     * to pend. A slot request is considered unfulfillable if it cannot be fulfilled by neither a
     * slot that is already registered (including allocated ones) nor a pending slot that the {@link
     * ResourceActions} can allocate.
     */
    private boolean failUnfulfillableRequest = true;

    /** The default resource spec of workers to request. */
    private final WorkerResourceSpec defaultWorkerResourceSpec;

    private final int numSlotsPerWorker;

    private final ResourceProfile defaultSlotResourceProfile;

    private final SlotManagerMetricGroup slotManagerMetricGroup;


3.2. 构造方法

构造方法就是普通的赋值&初始化的过程…


    public SlotManagerImpl(
            ScheduledExecutor scheduledExecutor,
            SlotManagerConfiguration slotManagerConfiguration,
            SlotManagerMetricGroup slotManagerMetricGroup) {

        this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);

        Preconditions.checkNotNull(slotManagerConfiguration);
        this.slotMatchingStrategy = slotManagerConfiguration.getSlotMatchingStrategy();
        this.taskManagerRequestTimeout = slotManagerConfiguration.getTaskManagerRequestTimeout();
        this.slotRequestTimeout = slotManagerConfiguration.getSlotRequestTimeout();
        this.taskManagerTimeout = slotManagerConfiguration.getTaskManagerTimeout();
        this.waitResultConsumedBeforeRelease =
                slotManagerConfiguration.isWaitResultConsumedBeforeRelease();
        this.defaultWorkerResourceSpec = slotManagerConfiguration.getDefaultWorkerResourceSpec();
        this.numSlotsPerWorker = slotManagerConfiguration.getNumSlotsPerWorker();
        this.defaultSlotResourceProfile =
                generateDefaultSlotResourceProfile(defaultWorkerResourceSpec, numSlotsPerWorker);
        this.slotManagerMetricGroup = Preconditions.checkNotNull(slotManagerMetricGroup);
        this.maxSlotNum = slotManagerConfiguration.getMaxSlotNum();
        this.redundantTaskManagerNum = slotManagerConfiguration.getRedundantTaskManagerNum();

        slots = new HashMap<>(16);
        freeSlots = new LinkedHashMap<>(16);
        taskManagerRegistrations = new HashMap<>(4);
        fulfilledSlotRequests = new HashMap<>(16);
        pendingSlotRequests = new HashMap<>(16);
        pendingSlots = new HashMap<>(16);

        resourceManagerId = null;
        resourceActions = null;
        mainThreadExecutor = null;
        taskManagerTimeoutsAndRedundancyCheck = null;
        slotRequestTimeoutCheck = null;

        started = false;
    }

3.3. 方法详单

3.3.1. start

启动


    /**
     * Starts the slot manager with the given leader id and resource manager actions.
     *
     * @param newResourceManagerId to use for communication with the task managers
     * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
     * @param newResourceActions to use for resource (de-)allocations
     */
    @Override
    public void start(
            ResourceManagerId newResourceManagerId,
            Executor newMainThreadExecutor,
            ResourceActions newResourceActions) {
        LOG.info("Starting the SlotManager.");

        this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
        mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
        resourceActions = Preconditions.checkNotNull(newResourceActions);

        started = true;

        taskManagerTimeoutsAndRedundancyCheck =
                scheduledExecutor.scheduleWithFixedDelay(
                        () ->
                                mainThreadExecutor.execute(
                                        () -> checkTaskManagerTimeoutsAndRedundancy()),
                        0L,
                        taskManagerTimeout.toMilliseconds(),
                        TimeUnit.MILLISECONDS);

        slotRequestTimeoutCheck =
                scheduledExecutor.scheduleWithFixedDelay(
                        () -> mainThreadExecutor.execute(() -> checkSlotRequestTimeouts()),
                        0L,
                        slotRequestTimeout.toMilliseconds(),
                        TimeUnit.MILLISECONDS);

        registerSlotManagerMetrics();
    }

3.3.2. suspend

挂起


    /** Suspends the component. This clears the internal state of the slot manager. */
    @Override
    public void suspend() {
        LOG.info("Suspending the SlotManager.");

        // stop the timeout checks for the TaskManagers and the SlotRequests
        if (taskManagerTimeoutsAndRedundancyCheck != null) {
            taskManagerTimeoutsAndRedundancyCheck.cancel(false);
            taskManagerTimeoutsAndRedundancyCheck = null;
        }

        if (slotRequestTimeoutCheck != null) {
            slotRequestTimeoutCheck.cancel(false);
            slotRequestTimeoutCheck = null;
        }

        for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
            cancelPendingSlotRequest(pendingSlotRequest);
        }

        pendingSlotRequests.clear();

        ArrayList<InstanceID> registeredTaskManagers =
                new ArrayList<>(taskManagerRegistrations.keySet());

        for (InstanceID registeredTaskManager : registeredTaskManagers) {
            unregisterTaskManager(
                    registeredTaskManager,
                    new SlotManagerException("The slot manager is being suspended."));
        }

        resourceManagerId = null;
        resourceActions = null;
        started = false;
    }


3.3.3. int getNumberRegisteredSlots()

获取注册slot数量

    @Override
    public int getNumberRegisteredSlots() {
        return slots.size();
    }

3.3.4. int getNumberRegisteredSlotsOf(InstanceID instanceId)

根据InstanceID获取slot数量

    @Override
    public int getNumberRegisteredSlotsOf(InstanceID instanceId) {
        TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);

        if (taskManagerRegistration != null) {
            return taskManagerRegistration.getNumberRegisteredSlots();
        } else {
            return 0;
        }
    }


3.3.5. int getNumberFreeSlots()

获取空闲slot数量

    @Override
    public int getNumberFreeSlots() {
        return freeSlots.size();
    }


3.3.6. int getNumberFreeSlotsOf(InstanceID instanceId)

根据InstanceID获取空闲slot数量

    @Override
    public int getNumberFreeSlotsOf(InstanceID instanceId) {
        TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);

        if (taskManagerRegistration != null) {
            return taskManagerRegistration.getNumberFreeSlots();
        } else {
            return 0;
        }
    }

3.3.7. Map<WorkerResourceSpec, Integer> getRequiredResources();

获取从{@link ResourceActions}请求的尚未完成的workers SlotManager的数量。


    @Override
    public Map<WorkerResourceSpec, Integer> getRequiredResources() {
        final int pendingWorkerNum =
                MathUtils.divideRoundUp(pendingSlots.size(), numSlotsPerWorker);
        return pendingWorkerNum > 0
                ? Collections.singletonMap(defaultWorkerResourceSpec, pendingWorkerNum)
                : Collections.emptyMap();
    }


3.3.8. ResourceProfile getRegisteredResource();

获取注册的Resource


    @Override
    public ResourceProfile getRegisteredResource() {
        return getResourceFromNumSlots(getNumberRegisteredSlots());
    }


3.3.9. ResourceProfile getRegisteredResourceOf(InstanceID instanceID);

根据InstanceId 获取注册的Resource

    @Override
    public ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {
        return getResourceFromNumSlots(getNumberRegisteredSlotsOf(instanceID));
    }


3.3.10. ResourceProfile getFreeResource();

获取空闲Resource

    @Override
    public ResourceProfile getFreeResource() {
        return getResourceFromNumSlots(getNumberFreeSlots());
    }

3.3.11. ResourceProfile getFreeResourceOf(InstanceID instanceID);

根据InstanceID获取空闲的Resource

    @Override
    public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
        return getResourceFromNumSlots(getNumberFreeSlotsOf(instanceID));
    }


3.3.12. int getNumberPendingSlotRequests();

挂起的SlotRequests数量

    @Override
    public ResourceProfile getFreeResource() {
        return getResourceFromNumSlots(getNumberFreeSlots());
    }

3.3.13. void processResourceRequirements(ResourceRequirements resourceRequirements);

通知slot manager关于 job需要的资源信息

    @Override
    public void processResourceRequirements(ResourceRequirements resourceRequirements) {
        // no-op; don't throw an UnsupportedOperationException here because there are code paths
        // where the resource
        // manager calls this method regardless of whether declarative resource management is used
        // or not
    }


3.3.14. boolean registerSlotRequest(SlotRequest slotRequest)

发送slot请求.


    /**
     * Requests a slot with the respective resource profile.
     *
     * @param slotRequest specifying the requested slot specs
     * @return true if the slot request was registered; false if the request is a duplicate
     * @throws ResourceManagerException if the slot request failed (e.g. not enough resources left)
     */
    @Override
    public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
        checkInit();

        if (checkDuplicateRequest(slotRequest.getAllocationId())) {
            LOG.debug(
                    "Ignoring a duplicate slot request with allocation id {}.",
                    slotRequest.getAllocationId());

            return false;
        } else {
            //构建请求
            PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);

            pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);

            try {
                // 请求操作
                internalRequestSlot(pendingSlotRequest);
            } catch (ResourceManagerException e) {
                // requesting the slot failed --> remove pending slot request
                pendingSlotRequests.remove(slotRequest.getAllocationId());

                throw new ResourceManagerException(
                        "Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
            }

            return true;
        }
    }


    /**
     * Tries to allocate a slot for the given slot request. If there is no slot available, the
     * resource manager is informed to allocate more resources and a timeout for the request is
     * registered.
     *
     * @param pendingSlotRequest to allocate a slot for
     * @throws ResourceManagerException if the slot request failed or is unfulfillable
     */
    private void internalRequestSlot(PendingSlotRequest pendingSlotRequest)
            throws ResourceManagerException {
        // 获取配置...
        final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();

        OptionalConsumer.of(findMatchingSlot(resourceProfile))
                .ifPresent(taskManagerSlot -> {
                    // taskManagerSlot 存在操作
                    allocateSlot(taskManagerSlot, pendingSlotRequest);
                })
                .ifNotPresent(() -> {
                            // taskManagerSlot 不存在操作 ==>  启动 TaskManager
                            fulfillPendingSlotRequestWithPendingTaskManagerSlot( pendingSlotRequest);
                        });
    }

3.3.15. boolean unregisterSlotRequest(AllocationID allocationId)

取消发送slot请求.


    /**
     * Cancels and removes a pending slot request with the given allocation id. If there is no such
     * pending request, then nothing is done.
     *
     * @param allocationId identifying the pending slot request
     * @return True if a pending slot request was found; otherwise false
     */
    @Override
    public boolean unregisterSlotRequest(AllocationID allocationId) {
        checkInit();

        PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);

        if (null != pendingSlotRequest) {
            LOG.debug("Cancel slot request {}.", allocationId);

            cancelPendingSlotRequest(pendingSlotRequest);

            return true;
        } else {
            LOG.debug(
                    "No pending slot request with allocation id {} found. Ignoring unregistration request.",
                    allocationId);

            return false;
        }
    }


3.3.16. registerTaskManager

注册TaskManager


    /**
     *
     * 在 slot manager中注册一个新的task manager
     *
     * 从而是 task managers slots 可以被感知/调度
     *
     * Registers a new task manager at the slot manager.
     * This will make the task managers slots known and, thus, available for allocation.
     *
     * @param taskExecutorConnection for the new task manager
     * @param initialSlotReport for the new task manager
     * @return True if the task manager has not been registered before and is registered
     *     successfully; otherwise false
     */
    @Override
    public boolean registerTaskManager(
            final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {

        // 初始化检查
        // The slot manager has not been started.
        checkInit();

        LOG.debug(
                "Registering TaskManager {} under {} at the SlotManager.",
                taskExecutorConnection.getResourceID().getStringWithMetadata(),
                taskExecutorConnection.getInstanceID());

        // 我们通过任务管理器的实例id来识别它们
        // we identify task managers by their instance id

        if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {

            // 之间已经连接过, 直接报搞slot的状态.
            reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
            return false;
        } else {

            if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
                // 是否查过最大的 slot 数量...
                LOG.infoFlinkFlink 1.12.2 TaskSlot

FlinkFlink 1.12.2 TaskSlotTable

FlinkFlink 1.12.2 SlotManager

flinkFlink 1.12.2 源码浅析 : StreamTask 浅析

FlinkFlink 1.12.2 Task的调度 源码

flinkFlink 1.12.2 源码浅析 : Task 浅析