FlinkFlink 资源相关 Slot SlotPool

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 资源相关 Slot SlotPool相关的知识,希望对你有一定的参考价值。

在这里插入图片描述

1.概述

转载并且补充:http://www.qishunwang.net/news_show_82511.aspx

在这里插入图片描述

2.SlotPool

2.1. 介绍

SlotPool 是JobMaster用于管理slot的pool . 是一个接口类, 定义了相关slot的管理操作…

在这里插入图片描述
主要有如下方法

2.1.1. 生命周期相关接口

接口含义
start启动
suspend挂起
close关闭

2.1.2 resource manager 连接相关

接口含义
connectToResourceManager与ResourceManager建立连接
disconnectResourceManager关闭ResourceManager连接
registerTaskManager通过给定的ResourceId 注册一个TaskExecutor
releaseTaskManager释放TaskExecutor

2.1.3 Slot操作相关

接口含义
offerSlots释放slot
failAllocation根据给定的allocation id 标识slot为失败
getAvailableSlotsInformation获取当前可用的slots 信息.
getAllocatedSlotsInformation获取所有的slot信息
allocateAvailableSlot在给定的 request id 下使用给定的 allocation id 分配可用的slot。
如果没有具有给定分配id的插槽可用,则此方法返回{@code null}。
requestNewAllocatedSlot从resource manager 请求分配新slot。
此方法不会从池中已经可用的slot返回slot,而是将向该池添加一个新slot,该slot将立即分配并返回。
requestNewAllocatedBatchSlot从 resource manager 请求分配新的批处理slot
与普通slot不同,批处理slot只有在slot池不包含合适的slot时才会超时。
此外,它不会对来自资源管理器的故障信号做出反应。
disableBatchSlotRequestTimeoutCheck禁用批处理slot请求超时检查。
当其他人要接管超时检查职责时调用。
createAllocatedSlotReport创建有关属于指定 task manager 的已分配slot的报告。

3.SlotPoolImpl 实现类

SlotPoolImpl 是SlotPool接口的实现类.

slot pool为{@link ExecutionGraph}发出的slot请求提供服务。

当它无法提供slot请求时,它将尝试从ResourceManager获取新的slot。

如果当前没有可用的ResourceManager,或者ResourceManager拒绝了它,或者请求超时,那么它将使slot请求失败。

slot pool还保存提供给它并被接受的所有slot,因此即使ResourceManager关闭,也可以提供注册的空闲slot。

slot只有在无用时才会释放,例如,当作业完全运行时,但我们仍有一些可用slot。

所有的分配或槽提供都将由自己生成的AllocationID标识,我们将使用它来消除歧义。

在这里插入图片描述

3.1. 属性


	/** The interval (in milliseconds) in which the SlotPool writes its slot distribution on debug level.
	 * SlotPool在调试级别上写其槽分布的时间间隔(毫秒)。
	 * */
	private static final long STATUS_LOG_INTERVAL_MS = 60_000;

	private final JobID jobId;

	/** All registered TaskManagers, slots will be accepted and used only if the resource is registered.
	 * 仅当资源已注册时,才会接受和使用所有已注册的TaskManager、slot。
	 * */
	private final HashSet<ResourceID> registeredTaskManagers;

	/** The book-keeping of all allocated slots.
	 * //所有分配给当前 JobManager 的 slots
	 * */
	private final AllocatedSlots allocatedSlots;

	/** The book-keeping of all available slots.
	 * 所有可用的 slots(已经分配给该 JobManager,但还没有装载 payload)
	 * */
	private final AvailableSlots availableSlots;

	/** All pending requests waiting for slots.
	 * 所有处于等待状态的slot request(已经发送请求给 ResourceManager) 等待slot的所有挂起请求。
	 * */
	private final DualKeyLinkedMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests;

	/** The requests that are waiting for the resource manager to be connected.
	 * 处于等待状态的 slot request (还没有发送请求给 ResourceManager,此时没有和 ResourceManager 建立连接)
	 * 等待连接 resource manager 的请求。
	 * */
	private final LinkedHashMap<SlotRequestId, PendingRequest> waitingForResourceManager;

	/** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor).
	 * 外部请求调用超时(例如,到ResourceManager或TaskExecutor)。
	 * */
	private final Time rpcTimeout;

	/** Timeout for releasing idle slots.
	 * 释放空闲的slots超时时间
	 * */
	private final Time idleSlotTimeout;

	/** Timeout for batch slot requests.
	 * 批处理slot请求超时
	 * */
	private final Time batchSlotTimeout;

	private final Clock clock;

	/** the fencing token of the job manager. */
	private JobMasterId jobMasterId;

	/** The gateway to communicate with resource manager. */
	private ResourceManagerGateway resourceManagerGateway;

	private String jobManagerAddress;

	// 组件主线程执行器
	private ComponentMainThreadExecutor componentMainThreadExecutor;

3.2 生命周期相关接口

接口含义
start启动
suspend挂起
close关闭

3.2.1 start方法

/**
	 * Start the slot pool to accept RPC calls.
	 *
	 * 启动slot池以接受RPC调用。
	 *
	 * @param jobMasterId The necessary leader id for running the job.
	 * @param newJobManagerAddress for the slot requests which are sent to the resource manager
	 * @param componentMainThreadExecutor The main thread executor for the job master's main thread.
	 */
	public void start(
		@Nonnull JobMasterId jobMasterId,
		@Nonnull String newJobManagerAddress,
		@Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {

		this.jobMasterId = jobMasterId;
		this.jobManagerAddress = newJobManagerAddress;
		this.componentMainThreadExecutor = componentMainThreadExecutor;

		// 超时相关操作
		scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
		scheduleRunAsync(this::checkBatchSlotTimeout, batchSlotTimeout);

		if (log.isDebugEnabled()) {
			scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS);
		}
	}

3.2.2 suspend

/**
	 * Suspends this pool, meaning it has lost its authority to accept and distribute slots.
	 *
	 * 挂起此池,意味着它已失去接受和分发slot的权限。
	 */
	@Override
	public void suspend() {

		componentMainThreadExecutor.assertRunningInMainThread();

		log.info("Suspending SlotPool.");

		// cancel all pending allocations --> we can request these slots
		// again after we regained the leadership
		Set<AllocationID> allocationIds = pendingRequests.keySetB();

		for (AllocationID allocationId : allocationIds) {
			// resourceManagerGateway 取消 SlotRequest操作
			resourceManagerGateway.cancelSlotRequest(allocationId);
		}

		// do not accept any requests
		jobMasterId = null;
		resourceManagerGateway = null;

		// Clear (but not release!) the available slots. The TaskManagers should re-register them
		// at the new leader JobManager/SlotPool
		clear();
	}

3.2.3 close

@Override
	public void close() {
		log.info("Stopping SlotPool.");
		// cancel all pending allocations
		// 取消挂起的SlotRequests
		Set<AllocationID> allocationIds = pendingRequests.keySetB();

		for (AllocationID allocationId : allocationIds) {
			resourceManagerGateway.cancelSlotRequest(allocationId);
		}

		// 释放资源 通过释放相应的TaskExecutor来释放所有注册的插槽
		// release all registered slots by releasing the corresponding TaskExecutors
		for (ResourceID taskManagerResourceId : registeredTaskManagers) {
			final FlinkException cause = new FlinkException(
				"Releasing TaskManager " + taskManagerResourceId + ", because of stopping of SlotPool");
			releaseTaskManagerInternal(taskManagerResourceId, cause);
		}

		clear();
	}

3.3 resource manager 连接相关

接口含义
connectToResourceManager与ResourceManager建立连接
disconnectResourceManager关闭ResourceManager连接
registerTaskManager通过给定的ResourceId 注册一个TaskExecutor
releaseTaskManager释放TaskExecutor

3.3.1 connectToResourceManager

/**
	 * 与ResourceManager建立连接, 处理阻塞/挂起的请求…
	 * @param resourceManagerGateway  The RPC gateway for the resource manager.
	 */
	@Override
	public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) {
		this.resourceManagerGateway = checkNotNull(resourceManagerGateway);

		// 处理挂起的PendingRequest 请求.
		// work on all slots waiting for this connection
		for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
			// 请求 RM / 获取资源
			requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
		}

		// all sent off
		waitingForResourceManager.clear();
	}

3.3.2 disconnectResourceManager

关闭ResourceManager 连接.

@Override
	public void disconnectResourceManager() {
		this.resourceManagerGateway = null;
	}

3.3.3 registerTaskManager

/**
	 * Register TaskManager to this pool, only those slots come from registered TaskManager will be considered valid.
	 * Also it provides a way for us to keep "dead" or "abnormal" TaskManagers out of this pool.
	 *
	 * @param resourceID The id of the TaskManager
	 *
	 *
	 *                      将TaskManager注册到此 pool ,只有来自已注册TaskManager的slot才被视为有效。
	 * 它还为我们提供了一种方法,使“dead”或“abnormal”任务管理者远离这个池
	 */
	@Override
	public boolean registerTaskManager(final ResourceID resourceID) {

		componentMainThreadExecutor.assertRunningInMainThread();

		log.debug("Register new TaskExecutor {}.", resourceID);
		return registeredTaskManagers.add(resourceID);
	}

3.3.4 releaseTaskManager

/**
	 * Unregister TaskManager from this pool, all the related slots will be released and tasks be canceled. Called
	 * when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore.

	 * 从该池中注销TaskManager,将释放所有相关slot并取消任务。
	 * 当我们发现某个TaskManager变得“dead”或“abnormal”,并且我们决定不再使用其中的slot时调用。
	 * 
	 * @param resourceId The id of the TaskManager
	 * @param cause for the releasing of the TaskManager
	 */
	@Override
	public boolean releaseTaskManager(final ResourceID resourceId, final Exception cause) {

		componentMainThreadExecutor.assertRunningInMainThread();

		if (registeredTaskManagers.remove(resourceId)) {
			releaseTaskManagerInternal(resourceId, cause);
			return true;
		} else {
			return false;
		}
	}

3.4 Slot操作相关

接口含义
offerSlots消费slot
failAllocation根据给定的allocation id 标识slot为失败
getAvailableSlotsInformation获取当前可用的slots 信息.
getAllocatedSlotsInformation获取所有的slot信息
allocateAvailableSlot在给定的 request id 下使用给定的 allocation id 分配可用的slot。
如果没有具有给定分配id的插槽可用,则此方法返回{@code null}。
requestNewAllocatedSlot从resource manager 请求分配新slot。
此方法不会从池中已经可用的slot返回slot,而是将向该池添加一个新slot,该slot将立即分配并返回。
requestNewAllocatedBatchSlot从 resource manager 请求分配新的批处理slot
与普通slot不同,批处理slot只有在slot池不包含合适的slot时才会超时。
此外,它不会对来自资源管理器的故障信号做出反应。
disableBatchSlotRequestTimeoutCheck禁用批处理slot请求超时检查。
当其他人要接管超时检查职责时调用。
createAllocatedSlotReport创建有关属于指定 task manager 的已分配slot的报告。

3.4.1 offerSlots

/**
	 * 根据AllocationID , TaskExecutor 提供Slot
	 *
	 * AllocationID最初由该 pool 生成,并通过ResourceManager传输到TaskManager
	 *
	 * 我们用它来区分我们发行的不同分配。
	 *
	 * 如果我们发现某个Slot不匹配或实际上没有等待此Slot的挂起请求(可能由其他返回的Slot完成),则Slot提供可能会被拒绝。
	 *
	 * Slot offering by TaskExecutor with AllocationID. The AllocationID is originally generated by this pool and
	 * transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation
	 * we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending
	 * request waiting for this slot (maybe fulfilled by some other returned slot).
	 *
	 * @param taskManagerLocation location from where the offer comes from
	 * @param taskManagerGateway TaskManager gateway
	 * @param slotOffer the offered slot
	 * @return True if we accept the offering
	 */
	boolean offerSlot(
			final TaskManagerLocation taskManagerLocation,
			final TaskManagerGateway taskManagerGateway,
			final SlotOffer slotOffer) {

		componentMainThreadExecutor.assertRunningInMainThread();

		// 检测 TaskManager是否有效
		// check if this TaskManager is valid
		final ResourceID resourceID = taskManagerLocation.getResourceID();
		final AllocationID allocationID = slotOffer.getAllocationId();

		// 必须是已注册的TaskManagers 中的slotOffer
		if (!registeredTaskManagers.contains(resourceID)) {
			log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
					slotOffer.getAllocationId(), taskManagerLocation);
			return false;
		}

		// 如果当前 slot 关联的 AllocationID 已经在 SlotPool 中出现   检查是否已使用此slot
		// check whether we have already using this slot
		AllocatedSlot existingSlot;
		if ((existingSlot = allocatedSlots.get(allocationID)) != null ||
			(existingSlot = availableSlots.get(allocationID)) != null) {

			// we need to figure out if this is a repeated offer for the exact same slot,
			// or another offer that comes from a different TaskManager after the ResourceManager
			// re-tried the request

			//  我们需要弄清楚这是对完全相同的slot的重复offer,
			//  还是在ResourceManager重新尝试请求后来自不同TaskManager的另一个offer

			// 我们用比较SlotID的方式来写这个,因为SlotIDD是 TaskManager上实际slot的标识符

			// we write this in terms of comparing slot IDs, because the Slot IDs are the identifiers of
			// the actual slots on the TaskManagers
			// Note: The slotOffer should have the SlotID

			// 获取已存在的SlotID
			final SlotID existingSlotId = existingSlot.getSlotId();
			// 获取新的SlotID
			final SlotID newSlotId = new SlotID(taskManagerLocation.getResourceID(), slotOffer.getSlotIndex());

			//这个 slot 在之前已经被 SlotPool 接受了,相当于 TaskExecutor 发送了一个重复的 offer
			if (existingSlotId.equals(newSlotId)) {
				log.info("Received repeated offer for slot [{}]. Ignoring.", allocationID);

				// return true here so that the sender will get a positive acknowledgement to the retry
				// and mark the offering as a success
				return true;
			} else {
				//已经有一个其他的 AllocatedSlot 和 这个 AllocationID 关联了,因此不能接受当前的这个 slot
				// the allocation has been fulfilled by another slot, reject the offer so the task executor
				// will offer the slot to the resource manager
				return false;
			}
		}

		// 到这里代表这个slot还没有人用过.
		//这个 slot 关联的 AllocationID 此前没有出现过

		//新建一个 AllocatedSlot 对象,表示新分配的 slot
		final AllocatedSlot allocatedSlot = new AllocatedSlot(
			allocationID,
			taskManagerLocation,
			slotOffer.getSlotIndex(),
			slotOffer.getResourceProfile(),
			taskManagerGateway);

		// 检查是否有一个 request 和 这个 AllocationID 关联
		// check whether we have request waiting for this slot
		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
		if (pendingRequest != null) {
			// we were waiting for this!
			//有一个pending request 正在等待这个 slot
			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);

			//尝试去完成那个等待的请求
			if (!pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot)) {
				// we could not complete the pending slot future --> try to fulfill another pending request
				//失败了
				allocatedSlots.remove(pendingRequest.getSlotRequestId());
				//尝试去满足其他在等待的请求,使用 slot 以请求的顺序完成挂起的请求
				tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
			} else {
				log.debug("Fulfilled slot request [{}] with allocated slot [{}].", pendingRequest.getSlotRequestId(), allocationID);
			}
		}
		else {
			//没有请求在等待这个slot,可能请求已经被满足了
			// we were actually not waiting for this:
			//   - could be that this request had been fulfilled
			//   - we are receiving the slots from TaskManagers after becoming leaders
			//尝试去满足其他在等待的请求
			tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
		}

		// we accepted the request in any case. slot will be released after it idled for
		// too long and timed out
		// 无论如何我么都接受了这个请求.
		// slot在空闲时间过长和超时后将被释放
		return true;
	}

tryFulfillSlotRequestOrMakeAvailable

/**
	 * Tries to fulfill with the given allocated slot a pending slot request or add the
	 * allocated slot to the set of available slots if no matching request is available.
	 *
	 * 尝试使用给定的已分配slot完成挂起的slot请求,
	 * 或者如果没有匹配的请求,则将已分配的slot归还到可用slot集。
	 *
	 * @param allocatedSlot which shall be returned
	 */
	private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {
		Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use.");

		//查找和当前 AllocatedSlot 的计算资源相匹配的还在等待的请求
		final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);

		if (pendingRequest != null) {
			//如果有匹配的请求,那么将 AllocatedSlot 分配给等待的请求
			log.debug("Fulfilling pending slot request [{}] early with returned slot [{}]",
				pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());

			// 将当前分配的slot加入到已分配的allocatedSlots集合中, 标识已被使用.
			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);

			// 回调请求,返回allocatedSlot 信息.  标识slot分配已经完成...
			pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
		} else {
			//如果没有,那么这个 AllocatedSlot 变成 available 的
			// 没有可用的PendingRequest , 归还allocatedSlot .
			log.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
			availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
		}
	}

3.4.2 failAllocation

@Override
	public Optional<ResourceID> failAllocation(final AllocationID allocationID, final Exception cause) {

		componentMainThreadExecutor.assertRunningInMainThread();

		// 获取PendingRequest
		final PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
		if (pendingRequest != null) {
			if (isBatchRequestAndFailureCanBeIgnored(pendingRequest, cause)) {
				// pending batch requests don't react to this signal --> put it back
				pendingRequests.put(pendingRequest.getSlotRequestId(), allocationID, pendingRequest);
			} else {
				// request was still pending
				failPendingRequest(pendingRequest, cause);
			}
			return Optional.empty();
		}
		else {
			return tryFailingAllocatedSlot(allocationID, cause);
		}

		// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
	}

tryFailingAllocatedSlot


	private Optional<ResourceID> tryFailingAllocatedSlot(AllocationID allocationID, Exception cause) {
		// 获取分配失败的AllocatedSlot
		AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID);

		if (allocatedSlot == null) {
			allocatedSlot = allocatedSlots.remove(allocationID);
		}

		if (allocatedSlot != null) {
			log.debug("Failed allocated slot [{}]: {}", allocationID, cause.getMessage());

			// notify TaskExecutor about the failure
			// 通知TaskExecutor 分配失败了..
			allocatedSlot.getTaskManagerGateway().freeSlot(allocationID, cause, rpcTimeout);
			// release the slot.
			// since it is not in 'allocatedSlots' any more, it will be dropped o return'
			// 释放slot,并且将这个slot丢弃
			allocatedSlot.releasePayload(cause);

			final ResourceID taskManagerId = allocatedSlot.getTaskManagerId();

			if (!availableSlots.containsTaskManager(taskManagerId) && !allocatedSlots.containResource(taskManagerId)) {
				return Optional.of(taskManagerId);
			}
		}

		return Optional.empty();
	}

3.4.3 getAvailableSlotsInformation

获取可用的slot信息

/**
	 * 列出当前可用的 slot
	 * @return
	 */
	@Override
	@Nonnull
	public Collection<SlotInfoWithUtilization> getAvailableSlotsInformation() {
		final Map<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager = availableSlots.getSlotsByTaskManager();
		final Map<ResourceID, Set<AllocatedSlot>> allocatedSlotsSlotsByTaskManager = allocatedSlots.getSlotsByTaskManager();

		return availableSlotsByTaskManager.entrySet().stream()
			.flatMap(entry -> {
				final int numberAllocatedSlots = allocatedSlotsSlotsByTaskManager.getOrDefault(entry.getKey(), Collections.emptySet()).size();
				final int numberAvailableSlots = entry.getValue().size();
				final double taskExecutorUtilization = (double) numberAllocatedSlots / (numberAllocatedSlots + numberAvailableSlots);

				return entry.getValue().stream().map(slot 以上是关于FlinkFlink 资源相关 Slot SlotPool的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFlink 1.12.2 SlotManager

FlinkFLink assigned slot xx was removed

FlinkFlink 1.12.2 TaskSlot

FlinkFlink 计算资源管理

深入解析 Flink 细粒度资源管理

Qt--SLOT槽函数相关