FlinkFlink 资源相关 Slot SlotPool
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 资源相关 Slot SlotPool相关的知识,希望对你有一定的参考价值。
1.概述
转载并且补充:http://www.qishunwang.net/news_show_82511.aspx
2.SlotPool
2.1. 介绍
SlotPool 是JobMaster用于管理slot的pool . 是一个接口类, 定义了相关slot的管理操作…
主要有如下方法
2.1.1. 生命周期相关接口
接口 | 含义 |
---|---|
start | 启动 |
suspend | 挂起 |
close | 关闭 |
2.1.2 resource manager 连接相关
接口 | 含义 |
---|---|
connectToResourceManager | 与ResourceManager建立连接 |
disconnectResourceManager | 关闭ResourceManager连接 |
registerTaskManager | 通过给定的ResourceId 注册一个TaskExecutor |
releaseTaskManager | 释放TaskExecutor |
2.1.3 Slot操作相关
接口 | 含义 |
---|---|
offerSlots | 释放slot |
failAllocation | 根据给定的allocation id 标识slot为失败 |
getAvailableSlotsInformation | 获取当前可用的slots 信息. |
getAllocatedSlotsInformation | 获取所有的slot信息 |
allocateAvailableSlot | 在给定的 request id 下使用给定的 allocation id 分配可用的slot。 如果没有具有给定分配id的插槽可用,则此方法返回{@code null}。 |
requestNewAllocatedSlot | 从resource manager 请求分配新slot。 此方法不会从池中已经可用的slot返回slot,而是将向该池添加一个新slot,该slot将立即分配并返回。 |
requestNewAllocatedBatchSlot | 从 resource manager 请求分配新的批处理slot 与普通slot不同,批处理slot只有在slot池不包含合适的slot时才会超时。 此外,它不会对来自资源管理器的故障信号做出反应。 |
disableBatchSlotRequestTimeoutCheck | 禁用批处理slot请求超时检查。 当其他人要接管超时检查职责时调用。 |
createAllocatedSlotReport | 创建有关属于指定 task manager 的已分配slot的报告。 |
3.SlotPoolImpl 实现类
SlotPoolImpl 是SlotPool接口的实现类.
slot pool为{@link ExecutionGraph}发出的slot请求提供服务。
当它无法提供slot请求时,它将尝试从ResourceManager获取新的slot。
如果当前没有可用的ResourceManager,或者ResourceManager拒绝了它,或者请求超时,那么它将使slot请求失败。
slot pool还保存提供给它并被接受的所有slot,因此即使ResourceManager关闭,也可以提供注册的空闲slot。
slot只有在无用时才会释放,例如,当作业完全运行时,但我们仍有一些可用slot。
所有的分配或槽提供都将由自己生成的AllocationID标识,我们将使用它来消除歧义。
3.1. 属性
/** The interval (in milliseconds) in which the SlotPool writes its slot distribution on debug level.
* SlotPool在调试级别上写其槽分布的时间间隔(毫秒)。
* */
private static final long STATUS_LOG_INTERVAL_MS = 60_000;
private final JobID jobId;
/** All registered TaskManagers, slots will be accepted and used only if the resource is registered.
* 仅当资源已注册时,才会接受和使用所有已注册的TaskManager、slot。
* */
private final HashSet<ResourceID> registeredTaskManagers;
/** The book-keeping of all allocated slots.
* //所有分配给当前 JobManager 的 slots
* */
private final AllocatedSlots allocatedSlots;
/** The book-keeping of all available slots.
* 所有可用的 slots(已经分配给该 JobManager,但还没有装载 payload)
* */
private final AvailableSlots availableSlots;
/** All pending requests waiting for slots.
* 所有处于等待状态的slot request(已经发送请求给 ResourceManager) 等待slot的所有挂起请求。
* */
private final DualKeyLinkedMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests;
/** The requests that are waiting for the resource manager to be connected.
* 处于等待状态的 slot request (还没有发送请求给 ResourceManager,此时没有和 ResourceManager 建立连接)
* 等待连接 resource manager 的请求。
* */
private final LinkedHashMap<SlotRequestId, PendingRequest> waitingForResourceManager;
/** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor).
* 外部请求调用超时(例如,到ResourceManager或TaskExecutor)。
* */
private final Time rpcTimeout;
/** Timeout for releasing idle slots.
* 释放空闲的slots超时时间
* */
private final Time idleSlotTimeout;
/** Timeout for batch slot requests.
* 批处理slot请求超时
* */
private final Time batchSlotTimeout;
private final Clock clock;
/** the fencing token of the job manager. */
private JobMasterId jobMasterId;
/** The gateway to communicate with resource manager. */
private ResourceManagerGateway resourceManagerGateway;
private String jobManagerAddress;
// 组件主线程执行器
private ComponentMainThreadExecutor componentMainThreadExecutor;
3.2 生命周期相关接口
接口 | 含义 |
---|---|
start | 启动 |
suspend | 挂起 |
close | 关闭 |
3.2.1 start方法
/**
* Start the slot pool to accept RPC calls.
*
* 启动slot池以接受RPC调用。
*
* @param jobMasterId The necessary leader id for running the job.
* @param newJobManagerAddress for the slot requests which are sent to the resource manager
* @param componentMainThreadExecutor The main thread executor for the job master's main thread.
*/
public void start(
@Nonnull JobMasterId jobMasterId,
@Nonnull String newJobManagerAddress,
@Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {
this.jobMasterId = jobMasterId;
this.jobManagerAddress = newJobManagerAddress;
this.componentMainThreadExecutor = componentMainThreadExecutor;
// 超时相关操作
scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
scheduleRunAsync(this::checkBatchSlotTimeout, batchSlotTimeout);
if (log.isDebugEnabled()) {
scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
}
3.2.2 suspend
/**
* Suspends this pool, meaning it has lost its authority to accept and distribute slots.
*
* 挂起此池,意味着它已失去接受和分发slot的权限。
*/
@Override
public void suspend() {
componentMainThreadExecutor.assertRunningInMainThread();
log.info("Suspending SlotPool.");
// cancel all pending allocations --> we can request these slots
// again after we regained the leadership
Set<AllocationID> allocationIds = pendingRequests.keySetB();
for (AllocationID allocationId : allocationIds) {
// resourceManagerGateway 取消 SlotRequest操作
resourceManagerGateway.cancelSlotRequest(allocationId);
}
// do not accept any requests
jobMasterId = null;
resourceManagerGateway = null;
// Clear (but not release!) the available slots. The TaskManagers should re-register them
// at the new leader JobManager/SlotPool
clear();
}
3.2.3 close
@Override
public void close() {
log.info("Stopping SlotPool.");
// cancel all pending allocations
// 取消挂起的SlotRequests
Set<AllocationID> allocationIds = pendingRequests.keySetB();
for (AllocationID allocationId : allocationIds) {
resourceManagerGateway.cancelSlotRequest(allocationId);
}
// 释放资源 通过释放相应的TaskExecutor来释放所有注册的插槽
// release all registered slots by releasing the corresponding TaskExecutors
for (ResourceID taskManagerResourceId : registeredTaskManagers) {
final FlinkException cause = new FlinkException(
"Releasing TaskManager " + taskManagerResourceId + ", because of stopping of SlotPool");
releaseTaskManagerInternal(taskManagerResourceId, cause);
}
clear();
}
3.3 resource manager 连接相关
接口 | 含义 |
---|---|
connectToResourceManager | 与ResourceManager建立连接 |
disconnectResourceManager | 关闭ResourceManager连接 |
registerTaskManager | 通过给定的ResourceId 注册一个TaskExecutor |
releaseTaskManager | 释放TaskExecutor |
3.3.1 connectToResourceManager
/**
* 与ResourceManager建立连接, 处理阻塞/挂起的请求…
* @param resourceManagerGateway The RPC gateway for the resource manager.
*/
@Override
public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) {
this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
// 处理挂起的PendingRequest 请求.
// work on all slots waiting for this connection
for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
// 请求 RM / 获取资源
requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
}
// all sent off
waitingForResourceManager.clear();
}
3.3.2 disconnectResourceManager
关闭ResourceManager 连接.
@Override
public void disconnectResourceManager() {
this.resourceManagerGateway = null;
}
3.3.3 registerTaskManager
/**
* Register TaskManager to this pool, only those slots come from registered TaskManager will be considered valid.
* Also it provides a way for us to keep "dead" or "abnormal" TaskManagers out of this pool.
*
* @param resourceID The id of the TaskManager
*
*
* 将TaskManager注册到此 pool ,只有来自已注册TaskManager的slot才被视为有效。
* 它还为我们提供了一种方法,使“dead”或“abnormal”任务管理者远离这个池
*/
@Override
public boolean registerTaskManager(final ResourceID resourceID) {
componentMainThreadExecutor.assertRunningInMainThread();
log.debug("Register new TaskExecutor {}.", resourceID);
return registeredTaskManagers.add(resourceID);
}
3.3.4 releaseTaskManager
/**
* Unregister TaskManager from this pool, all the related slots will be released and tasks be canceled. Called
* when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore.
* 从该池中注销TaskManager,将释放所有相关slot并取消任务。
* 当我们发现某个TaskManager变得“dead”或“abnormal”,并且我们决定不再使用其中的slot时调用。
*
* @param resourceId The id of the TaskManager
* @param cause for the releasing of the TaskManager
*/
@Override
public boolean releaseTaskManager(final ResourceID resourceId, final Exception cause) {
componentMainThreadExecutor.assertRunningInMainThread();
if (registeredTaskManagers.remove(resourceId)) {
releaseTaskManagerInternal(resourceId, cause);
return true;
} else {
return false;
}
}
3.4 Slot操作相关
接口 | 含义 |
---|---|
offerSlots | 消费slot |
failAllocation | 根据给定的allocation id 标识slot为失败 |
getAvailableSlotsInformation | 获取当前可用的slots 信息. |
getAllocatedSlotsInformation | 获取所有的slot信息 |
allocateAvailableSlot | 在给定的 request id 下使用给定的 allocation id 分配可用的slot。 如果没有具有给定分配id的插槽可用,则此方法返回{@code null}。 |
requestNewAllocatedSlot | 从resource manager 请求分配新slot。 此方法不会从池中已经可用的slot返回slot,而是将向该池添加一个新slot,该slot将立即分配并返回。 |
requestNewAllocatedBatchSlot | 从 resource manager 请求分配新的批处理slot 与普通slot不同,批处理slot只有在slot池不包含合适的slot时才会超时。 此外,它不会对来自资源管理器的故障信号做出反应。 |
disableBatchSlotRequestTimeoutCheck | 禁用批处理slot请求超时检查。 当其他人要接管超时检查职责时调用。 |
createAllocatedSlotReport | 创建有关属于指定 task manager 的已分配slot的报告。 |
3.4.1 offerSlots
/**
* 根据AllocationID , TaskExecutor 提供Slot
*
* AllocationID最初由该 pool 生成,并通过ResourceManager传输到TaskManager
*
* 我们用它来区分我们发行的不同分配。
*
* 如果我们发现某个Slot不匹配或实际上没有等待此Slot的挂起请求(可能由其他返回的Slot完成),则Slot提供可能会被拒绝。
*
* Slot offering by TaskExecutor with AllocationID. The AllocationID is originally generated by this pool and
* transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation
* we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending
* request waiting for this slot (maybe fulfilled by some other returned slot).
*
* @param taskManagerLocation location from where the offer comes from
* @param taskManagerGateway TaskManager gateway
* @param slotOffer the offered slot
* @return True if we accept the offering
*/
boolean offerSlot(
final TaskManagerLocation taskManagerLocation,
final TaskManagerGateway taskManagerGateway,
final SlotOffer slotOffer) {
componentMainThreadExecutor.assertRunningInMainThread();
// 检测 TaskManager是否有效
// check if this TaskManager is valid
final ResourceID resourceID = taskManagerLocation.getResourceID();
final AllocationID allocationID = slotOffer.getAllocationId();
// 必须是已注册的TaskManagers 中的slotOffer
if (!registeredTaskManagers.contains(resourceID)) {
log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
slotOffer.getAllocationId(), taskManagerLocation);
return false;
}
// 如果当前 slot 关联的 AllocationID 已经在 SlotPool 中出现 检查是否已使用此slot
// check whether we have already using this slot
AllocatedSlot existingSlot;
if ((existingSlot = allocatedSlots.get(allocationID)) != null ||
(existingSlot = availableSlots.get(allocationID)) != null) {
// we need to figure out if this is a repeated offer for the exact same slot,
// or another offer that comes from a different TaskManager after the ResourceManager
// re-tried the request
// 我们需要弄清楚这是对完全相同的slot的重复offer,
// 还是在ResourceManager重新尝试请求后来自不同TaskManager的另一个offer
// 我们用比较SlotID的方式来写这个,因为SlotIDD是 TaskManager上实际slot的标识符
// we write this in terms of comparing slot IDs, because the Slot IDs are the identifiers of
// the actual slots on the TaskManagers
// Note: The slotOffer should have the SlotID
// 获取已存在的SlotID
final SlotID existingSlotId = existingSlot.getSlotId();
// 获取新的SlotID
final SlotID newSlotId = new SlotID(taskManagerLocation.getResourceID(), slotOffer.getSlotIndex());
//这个 slot 在之前已经被 SlotPool 接受了,相当于 TaskExecutor 发送了一个重复的 offer
if (existingSlotId.equals(newSlotId)) {
log.info("Received repeated offer for slot [{}]. Ignoring.", allocationID);
// return true here so that the sender will get a positive acknowledgement to the retry
// and mark the offering as a success
return true;
} else {
//已经有一个其他的 AllocatedSlot 和 这个 AllocationID 关联了,因此不能接受当前的这个 slot
// the allocation has been fulfilled by another slot, reject the offer so the task executor
// will offer the slot to the resource manager
return false;
}
}
// 到这里代表这个slot还没有人用过.
//这个 slot 关联的 AllocationID 此前没有出现过
//新建一个 AllocatedSlot 对象,表示新分配的 slot
final AllocatedSlot allocatedSlot = new AllocatedSlot(
allocationID,
taskManagerLocation,
slotOffer.getSlotIndex(),
slotOffer.getResourceProfile(),
taskManagerGateway);
// 检查是否有一个 request 和 这个 AllocationID 关联
// check whether we have request waiting for this slot
PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
if (pendingRequest != null) {
// we were waiting for this!
//有一个pending request 正在等待这个 slot
allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
//尝试去完成那个等待的请求
if (!pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot)) {
// we could not complete the pending slot future --> try to fulfill another pending request
//失败了
allocatedSlots.remove(pendingRequest.getSlotRequestId());
//尝试去满足其他在等待的请求,使用 slot 以请求的顺序完成挂起的请求
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
} else {
log.debug("Fulfilled slot request [{}] with allocated slot [{}].", pendingRequest.getSlotRequestId(), allocationID);
}
}
else {
//没有请求在等待这个slot,可能请求已经被满足了
// we were actually not waiting for this:
// - could be that this request had been fulfilled
// - we are receiving the slots from TaskManagers after becoming leaders
//尝试去满足其他在等待的请求
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
}
// we accepted the request in any case. slot will be released after it idled for
// too long and timed out
// 无论如何我么都接受了这个请求.
// slot在空闲时间过长和超时后将被释放
return true;
}
tryFulfillSlotRequestOrMakeAvailable
/**
* Tries to fulfill with the given allocated slot a pending slot request or add the
* allocated slot to the set of available slots if no matching request is available.
*
* 尝试使用给定的已分配slot完成挂起的slot请求,
* 或者如果没有匹配的请求,则将已分配的slot归还到可用slot集。
*
* @param allocatedSlot which shall be returned
*/
private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {
Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use.");
//查找和当前 AllocatedSlot 的计算资源相匹配的还在等待的请求
final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
if (pendingRequest != null) {
//如果有匹配的请求,那么将 AllocatedSlot 分配给等待的请求
log.debug("Fulfilling pending slot request [{}] early with returned slot [{}]",
pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
// 将当前分配的slot加入到已分配的allocatedSlots集合中, 标识已被使用.
allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
// 回调请求,返回allocatedSlot 信息. 标识slot分配已经完成...
pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
} else {
//如果没有,那么这个 AllocatedSlot 变成 available 的
// 没有可用的PendingRequest , 归还allocatedSlot .
log.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
}
}
3.4.2 failAllocation
@Override
public Optional<ResourceID> failAllocation(final AllocationID allocationID, final Exception cause) {
componentMainThreadExecutor.assertRunningInMainThread();
// 获取PendingRequest
final PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
if (pendingRequest != null) {
if (isBatchRequestAndFailureCanBeIgnored(pendingRequest, cause)) {
// pending batch requests don't react to this signal --> put it back
pendingRequests.put(pendingRequest.getSlotRequestId(), allocationID, pendingRequest);
} else {
// request was still pending
failPendingRequest(pendingRequest, cause);
}
return Optional.empty();
}
else {
return tryFailingAllocatedSlot(allocationID, cause);
}
// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
}
tryFailingAllocatedSlot
private Optional<ResourceID> tryFailingAllocatedSlot(AllocationID allocationID, Exception cause) {
// 获取分配失败的AllocatedSlot
AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID);
if (allocatedSlot == null) {
allocatedSlot = allocatedSlots.remove(allocationID);
}
if (allocatedSlot != null) {
log.debug("Failed allocated slot [{}]: {}", allocationID, cause.getMessage());
// notify TaskExecutor about the failure
// 通知TaskExecutor 分配失败了..
allocatedSlot.getTaskManagerGateway().freeSlot(allocationID, cause, rpcTimeout);
// release the slot.
// since it is not in 'allocatedSlots' any more, it will be dropped o return'
// 释放slot,并且将这个slot丢弃
allocatedSlot.releasePayload(cause);
final ResourceID taskManagerId = allocatedSlot.getTaskManagerId();
if (!availableSlots.containsTaskManager(taskManagerId) && !allocatedSlots.containResource(taskManagerId)) {
return Optional.of(taskManagerId);
}
}
return Optional.empty();
}
3.4.3 getAvailableSlotsInformation
获取可用的slot信息
/**
* 列出当前可用的 slot
* @return
*/
@Override
@Nonnull
public Collection<SlotInfoWithUtilization> getAvailableSlotsInformation() {
final Map<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager = availableSlots.getSlotsByTaskManager();
final Map<ResourceID, Set<AllocatedSlot>> allocatedSlotsSlotsByTaskManager = allocatedSlots.getSlotsByTaskManager();
return availableSlotsByTaskManager.entrySet().stream()
.flatMap(entry -> {
final int numberAllocatedSlots = allocatedSlotsSlotsByTaskManager.getOrDefault(entry.getKey(), Collections.emptySet()).size();
final int numberAvailableSlots = entry.getValue().size();
final double taskExecutorUtilization = (double) numberAllocatedSlots / (numberAllocatedSlots + numberAvailableSlots);
return entry.getValue().stream().map(slot 以上是关于FlinkFlink 资源相关 Slot SlotPool的主要内容,如果未能解决你的问题,请参考以下文章