Flink 1.1 – ResourceManager

Posted fxjwind

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 1.1 – ResourceManager相关的知识,希望对你有一定的参考价值。

Flink resource manager的作用如图,

image

 

FlinkResourceManager

/**
 *
 * <h1>Worker allocation steps</h1>
 *
 * <ol>
 *     <li>The resource manager decides to request more workers. This can happen in order
 *         to fill the initial pool, or as a result of the JobManager requesting more workers.</li>
 *
 *     <li>The resource master calls {@link #requestNewWorkers(int)}, which triggers requests
 *         for more containers. After that, the {@link #getNumWorkerRequestsPending()}
 *         should reflect the pending requests.</li>
 *
 *     <li>The concrete framework may acquire containers and then trigger to start TaskManagers
 *         in those containers. That should be reflected in {@link #getNumWorkersPendingRegistration()}.</li>
 *
 *     <li>At some point, the TaskManager processes will have started and send a registration
 *         message to the JobManager. The JobManager will perform
 *         a lookup with the ResourceManager to check if it really started this TaskManager.
 *         The method {@link #workerStarted(ResourceID)} will be called
 *         to inform about a registered worker.</li>
 * </ol>
 *
 */
public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrievable> extends FlinkUntypedActor {

    /** The service to find the right leader JobManager (to support high availability) */
    private final LeaderRetrievalService leaderRetriever; //用于发现leader jobmanager和当leader切换时收到通知

    /** Map which contains the workers from which we know that they have been successfully started
     * in a container. This notification is sent by the JM when a TM tries to register at it. */
    private final Map<ResourceID, WorkerType> startedWorkers; //已经成功启动的Workers,当他启动成功注册到JM的时候,JM会发出通知


    /** The JobManager that the framework master manages resources for */
    private ActorRef jobManager;

    /** Our JobManager\'s leader session */
    private UUID leaderSessionID;

    /** The size of the worker pool that the resource master strives to maintain */
    private int designatedPoolSize; //resource pool大小

上面注释里面,把申请resource的过程写的蛮清楚的

ResourceManager作为actor, 主要是处理message,

    @Override
    protected void handleMessage(Object message) {
        try {
            // --- messages about worker allocation and pool sizes

            if (message instanceof CheckAndAllocateContainers) {
                checkWorkersPool();
            }
            else if (message instanceof SetWorkerPoolSize) {
                SetWorkerPoolSize msg = (SetWorkerPoolSize) message;
                adjustDesignatedNumberOfWorkers(msg.numberOfWorkers());
            }
            else if (message instanceof RemoveResource) {
                RemoveResource msg = (RemoveResource) message;
                removeRegisteredResource(msg.resourceId());
            }

            // --- lookup of registered resources

            else if (message instanceof NotifyResourceStarted) {
                NotifyResourceStarted msg = (NotifyResourceStarted) message;
                handleResourceStarted(sender(), msg.getResourceID());
            }

            // --- messages about JobManager leader status and registration

            else if (message instanceof NewLeaderAvailable) {
                NewLeaderAvailable msg = (NewLeaderAvailable) message;
                newJobManagerLeaderAvailable(msg.leaderAddress(), msg.leaderSessionId());
            }
            else if (message instanceof TriggerRegistrationAtJobManager) {
                TriggerRegistrationAtJobManager msg = (TriggerRegistrationAtJobManager) message;
                triggerConnectingToJobManager(msg.jobManagerAddress());
            }
            else if (message instanceof RegisterResourceManagerSuccessful) {
                RegisterResourceManagerSuccessful msg = (RegisterResourceManagerSuccessful) message;
                jobManagerLeaderConnected(msg.jobManager(), msg.currentlyRegisteredTaskManagers());
            }

 

其中关键的是,

checkWorkersPool

/**
 * This method causes the resource framework master to <b>synchronously</b>re-examine
 * the set of available and pending workers containers, and allocate containers
 * if needed.
 *
 * This method does not automatically release workers, because it is not visible to
 * this resource master which workers can be released. Instead, the JobManager must
 * explicitly release individual workers.
 */
private void checkWorkersPool() {
    int numWorkersPending = getNumWorkerRequestsPending();
    int numWorkersPendingRegistration = getNumWorkersPendingRegistration();

    // see how many workers we want, and whether we have enough
    int allAvailableAndPending = startedWorkers.size() +
        numWorkersPending + numWorkersPendingRegistration;

    int missing = designatedPoolSize - allAvailableAndPending;

    if (missing > 0) {
        requestNewWorkers(missing); //如果现有的worker不够,去requestNewWorker
    }
}

job在收到taskManager的register信息后,会通知ResourceManager,调用到handleResourceStarted

/**
 * Tells the ResourceManager that a TaskManager had been started in a container with the given
 * resource id.
 *
 * @param jobManager The sender (JobManager) of the message
 * @param resourceID The resource id of the started TaskManager
 */
private void handleResourceStarted(ActorRef jobManager, ResourceID resourceID) {
    if (resourceID != null) {
        // check if resourceID is already registered (TaskManager may send duplicate register messages)
        WorkerType oldWorker = startedWorkers.get(resourceID);
        if (oldWorker != null) { //看看该worker是否已经存在
            LOG.debug("Notification that TaskManager {} had been started was sent before.", resourceID);
        } else {
            WorkerType newWorker = workerStarted(resourceID); //取得worker

            if (newWorker != null) {
                startedWorkers.put(resourceID, newWorker); //注册新的worker
                LOG.info("TaskManager {} has started.", resourceID);
            } else {
                LOG.info("TaskManager {} has not been started by this resource manager.", resourceID);
            }
        }
    }

    // Acknowledge the resource registration
    jobManager.tell(decorateMessage(Acknowledge.get()), self()); //告诉jobManager,已经完成注册
}

 

Job资源分配的过程,

在submitJob中,会生成ExecutionGraph
最终调用到,

executionGraph.scheduleForExecution(scheduler)

接着,ExecutionGraph

public void scheduleForExecution(SlotProvider slotProvider) throws JobException {
// simply take the vertices without inputs.
for (ExecutionJobVertex ejv : this.tasks.values()) {
if (ejv.getJobVertex().isInputVertex()) {
ejv.scheduleAll(slotProvider, allowQueuedScheduling);
}
}

然后,ExecutionJobVertex

public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {

ExecutionVertex[] vertices = this.taskVertices;

// kick off the tasks
for (ExecutionVertex ev : vertices) {
ev.scheduleForExecution(slotProvider, queued);
}
}
再,ExecutionVertex
public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
return this.currentExecution.scheduleForExecution(slotProvider, queued);
}

最终,Execution

public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {

    final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
    final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();

    if (transitionState(CREATED, SCHEDULED)) {

        ScheduledUnit toSchedule = locationConstraint == null ?
            new ScheduledUnit(this, sharingGroup) :
            new ScheduledUnit(this, sharingGroup, locationConstraint);

        // IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
        //     in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
        final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued); //异步去申请资源

        // IMPORTANT: We have to use the synchronous handle operation (direct executor) here so
        // that we directly deploy the tasks if the slot allocation future is completed. This is
        // necessary for immediate deployment.
        final Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>() {
            @Override
            public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
                if (simpleSlot != null) {
                    try {
                        deployToSlot(simpleSlot); //如果申请到,去部署
                    } catch (Throwable t) {
                        try {
                            simpleSlot.releaseSlot();
                        } finally {
                            markFailed(t);
                        }
                    }
                }
                else {
                    markFailed(throwable);
                }
                return null;
            }
        });
        
        return true;
    }

 

调用到,slotProvider.allocateSlot, slotProvider即Scheduler

@Override
public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued)
        throws NoResourceAvailableException {

    final Object ret = scheduleTask(task, allowQueued);
    if (ret instanceof SimpleSlot) {
        return FlinkCompletableFuture.completed((SimpleSlot) ret); //如果是SimpleSlot,即已经分配成功,表示future结束
    }
    else if (ret instanceof Future) {
        return (Future) ret; //Future说明没有足够资源,申请还在异步中,继续future
    }
    else {
        throw new RuntimeException();
    }
}

 

scheduleTask

/**
     * Returns either a {@link SimpleSlot}, or a {@link Future}.
     */
    private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {

        final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
        
        final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations();
        final boolean forceExternalLocation = vertex.isScheduleLocalOnly() &&
                                    preferredLocations != null && preferredLocations.iterator().hasNext();
    
        synchronized (globalLock) { //全局锁
            
            SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
            
            if (sharingUnit != null) { //如果是共享slot

                // 1)  === If the task has a slot sharing group, schedule with shared slots ===
                
                final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
                final CoLocationConstraint constraint = task.getLocationConstraint();
                
                // get a slot from the group, if the group has one for us (and can fulfill the constraint)
                final SimpleSlot slotFromGroup;
                if (constraint == null) {
                    slotFromGroup = assignment.getSlotForTask(vertex); //试图从现有的slots中找合适的
                }
                else {
                    slotFromGroup = assignment.getSlotForTask(vertex, constraint);
                }

                SimpleSlot newSlot = null;
                SimpleSlot toUse = null;

                // the following needs to make sure any allocated slot is released in case of an error
                try {
                    
                    // check whether the slot from the group is already what we want.
                    // any slot that is local, or where the assignment was unconstrained is good!
                    if (slotFromGroup != null && slotFromGroup.getLocality() != Locality.NON_LOCAL) { //如果可以找到合适的
                        updateLocalityCounters(slotFromGroup, vertex);
                        return slotFromGroup; //已经找到合适的slot,返回
                    }
                    
                    // the group did not have a local slot for us. see if we can one (or a better one)
                    newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly); //试图申请一个新的slot

                    
                    if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) {
                        // if there is no slot from the group, or the new slot is local,
                        // then we use the new slot
                        if (slotFromGroup != null) {
                            slotFromGroup.releaseSlot();
                        }
                        toUse = newSlot;
                    }
                    else {
                        // both are available and usable. neither is local. in that case, we may
                        // as well use the slot from the sharing group, to minimize the number of
                        // instances that the job occupies
                        newSlot.releaseSlot();
                        toUse = slotFromGroup;
                    }

                    // if this is the first slot for the co-location constraint, we lock
                    // the location, because we are going to use that slot
                    if (constraint != null && !constraint.isAssigned()) {
                        constraint.lockLocation();
                    }
                    
                    updateLocalityCounters(toUse, vertex);
                }
                
                return toUse; //返回申请的slot
            }
            else { //如果不是共享slot,比较简单
                
                // 2) === schedule without hints and sharing ===
                
                SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation); //直接申请slot
                if (slot != null) {
                    updateLocalityCounters(slot, vertex);
                    return slot; //申请到了就返回slot
                }
                else {
                    // no resource available now, so queue the request
                    if (queueIfNoResource) { //如果可以queue
                        CompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
                        this.taskQueue.add(new QueuedTask(task, future)); //把task缓存起来,并把future对象返回,表示异步申请
                        return future;
                    }
                }
            }
        }
    }

我们直接看非共享slot的case,

会调用到, getFreeSlotForTask

/**
     * Gets a suitable instance to schedule the vertex execution to.
     * <p>
     * NOTE: This method does is not thread-safe, it needs to be synchronized by the caller.
     * 
     * @param vertex The task to run. 
     * @return The instance to run the vertex on, it {@code null}, if no instance is available.
     */
    protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex,
                                            Iterable<TaskManagerLocation> requestedLocations,
                                            boolean localOnly) {
        // we need potentially to loop multiple times, because there may be false positives
        // in the set-with-available-instances
        while (true) {
            Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly); //找到分配slot的并符合location约束的instance

            if (instanceLocalityPair == null){
                return null; //没有合适的instance,分配失败
            }

            Instance instanceToUse = instanceLocalityPair.getLeft();
            Locality locality = instanceLocalityPair.getRight();

            try {
                SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId()); //从instance分配出slot
                
                // if the instance has further available slots, re-add it to the set of available resources.
                if (instanceToUse.hasResourcesAvailable()) { //如果这个实例还有resources,放入instancesWithAvailableResources,下次可以继续分配
                    this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);
                }
                
                if (slot != null) {
                    slot.setLocality(locality);
                    return slot; //成功就返回slot
                }
            }
            catch (InstanceDiedException e) {
                // the instance died it has not yet been propagated to this scheduler
                // remove the instance from the set of available instances
                removeInstance(instanceToUse);
            }
            
            // if we failed to get a slot, fall through the loop
        }
    }

 

findInstance

/**
     * Tries to find a requested instance. If no such instance is available it will return a non-
     * local instance. If no such instance exists (all slots occupied), then return null.
     * 
     * <p><b>NOTE:</b> This method is not thread-safe, it needs to be synchronized by the caller.</p>
     *
     * @param requestedLocations The list of preferred instances. May be null or empty, which indicates that
     *                           no locality preference exists.   
     * @param localOnly Flag to indicate whether only one of the exact local instances can be chosen.  
     */
    private Pair<Instance, Locality> findInstance(Iterable<TaskManagerLocation> requestedLocations, boolean localOnly) {
        
        // drain the queue of newly available instances
        while (this.newlyAvailableInstances.size() > 0) { //把newlyAvailableInstances新加到instancesWithAvailableResources
            Instance queuedInstance = this.newlyAvailableInstances.poll();
            if (queuedInstance != null) {
                this.instancesWithAvailableResources.put(queuedInstance.getTaskManagerID(), queuedInstance);
            }
        }
        
        // if nothing is available at all, return null
        if (this.instancesWithAvailableResources.isEmpty()) { //如果没有instancesWithAvailableResources,直接返回失败
            return null;
        }

        Iterator<TaskManagerLocation> locations = requestedLocations == null ? null : requestedLocations.iterator();

        if (locations != null && locations.hasNext()) { //按照locality preference依次找instance
            // we have a locality preference

            while (locations.hasNext()) {
                TaskManagerLocation location = locations.next();
                if (location != null) {
                    Instance instance = instancesWithAvailableResources.remove(location.getResourceID());
                    if (instance != null) {
                        return new ImmutablePair<Instance, Locality>(instance, Locality.LOCAL);
                    }
                }
            }
            
            // no local instance available
            if (localOnly) {
                return null;
            }
            else {
                // take the first instance from the instances with resources
                Iterator<Instance> instances = instancesWithAvailableResources.values().iterator();
                Instance instanceToUse = instances.next();
                instances.remove();

                return new ImmutablePair<>(instanceToUse, Locality.NON_LOCAL);
            }
        }
        else {
            // no location preference, so use some instance
            Iterator<Instance> instances = instancesWithAvailableResources.values().iterator();
            Instance instanceToUse = instances.next();
            instances.remove();

            return new ImmutablePair<>(instanceToUse, Locality.UNCONSTRAINED);
        }
    }

 

那么继续,newlyAvailableInstances,哪儿来的?

@Override
public void newInstanceAvailable(Instance instance) {
    
    // synchronize globally for instance changes
    synchronized (this.globalLock) {
        
        try {
            // make sure we get notifications about slots becoming available
            instance.setSlotAvailabilityListener(this); //将Scheduler设为Instance的SlotAvailabilityListener
            
            // store the instance in the by-host-lookup
            String instanceHostName = instance.getTaskManagerLocation().getHostname();
            Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName);
            if (instanceSet == null) {
                instanceSet = new HashSet<Instance>();
                allInstancesByHost.put(instanceHostName, instanceSet);
            }
            instanceSet.add(instance);

            // add it to the available resources and let potential waiters know
            this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance); //放入instancesWithAvailableResources

            // add all slots as available
            for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {
                newSlotAvailable(instance);
            }
        }
    }
}

 

    @Override
    public void newSlotAvailable(final Instance instance) {
        
        // WARNING: The asynchrony here is necessary, because  we cannot guarantee the order
        // of lock acquisition (global scheduler, instance) and otherwise lead to potential deadlocks:
        // 
        // -> The scheduler needs to grab them (1) global scheduler lock
        //                                     (2) slot/instance lock
        // -> The slot releasing grabs (1) slot/instance (for releasing) and
        //                             (2) scheduler (to check whether to take a new task item
        // 
        // that leads with a high probability to deadlocks, when scheduling fast

        this.newlyAvailableInstances.add(instance); //加入到newlyAvailableInstances

        Futures.future(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                handleNewSlot(); //异步的处理queue中的task,当有新的slot要把queue中的task执行掉
                return null;
            }
        }, executionContext);
    }

 

接着newInstanceAvailable,在InstanceManager里面被调用,

    private void notifyNewInstance(Instance instance) {
        synchronized (this.instanceListeners) {
            for (InstanceListener listener : this.instanceListeners) {
                try {
                    listener.newInstanceAvailable(instance);
                }
                catch (Throwable t) {
                    LOG.error("Notification of new instance availability failed.", t);
                }
            }
        }
    }

notifyNewInstance在registerTaskManager中被调用,

registerTaskManager是在JobManager里面当taskManager注册时被调用的

case msg @ RegisterTaskManager(
      resourceId,
      connectionInfo,
      hardwareInformation,
      numberOfSlots) =>

  val taskManager = sender()

  currentResourceManager match {
    case Some(rm) => //如果有resourceManager
      val future = (rm ? decorateMessage(new NotifyResourceStarted(msg.resourceId)))(timeout) //通知ResourceMananger,某个resource已经成功启动
  }

  // ResourceManager is told about the resource, now let\'s try to register TaskManager
  if (instanceManager.isRegistered(resourceId)) { //如果已经注册过
    val instanceID = instanceManager.getRegisteredInstance(resourceId).getId

    taskManager ! decorateMessage(
      AlreadyRegistered(
        instanceID,
        libraryCacheManager.getBlobServerPort))
  } else { //新的resource
    try {
      val actorGateway = new AkkaActorGateway(taskManager, leaderSessionID.orNull)
      val taskManagerGateway = new ActorTaskManagerGateway(actorGateway)

      val instanceID = instanceManager.registerTaskManager( //向InstanceManager注册该TaskManager
        taskManagerGateway,
        connectionInfo,
        hardwareInformation,
        numberOfSlots)

      taskManagerMap.put(taskManager, instanceID) //在jobManager里面记录该taskManager

      taskManager ! decorateMessage(
        AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)) //通知taskManager完成注册

      // to be notified when the taskManager is no longer reachable
      context.watch(taskManager)
    }
  }

 

这个版本没有实现图中的架构

当前TaskManager还是注册到JobManager,然后JobMananger会通知ResourceManager

当前ResourceManager只是起到一个记录的作用

ResourceManager没有从JobManager中独立出来

仍然是这种架构,

image

以上是关于Flink 1.1 – ResourceManager的主要内容,如果未能解决你的问题,请参考以下文章

Flink – process watermark

数仓学习1

06-flink-1.10.1-flink source api

Flink内核原理学习任务调度流程

Flink内核原理学习任务调度流程

08-flink-1.10.1- flink Transform api 转换算子