FlinkFlink 1.12.2 SlotManager
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 1.12.2 SlotManager相关的知识,希望对你有一定的参考价值。
1.概述
转载:Flink 1.12.2 源码浅析 :SlotManager
SlotManager
位于ResourceManager
中.
SlotManager 负责维护所有已注册的任务管理器slot、它们的分配和所有挂起的slot请求的视图。
无论何时注册新slot或释放分配的slot,它都会尝试执行另一个挂起的slot请求。 每当没有足够的可用slot时,slot管理器将通过{@link ResourceActions#allocateResource(WorkerResourceSpec)}通知资源管理器。
为了释放资源并避免资源泄漏,空闲任务管理器(其slot当前未使用的任务管理器)和挂起的slot请求分别超时,从而触发其释放和失败。
二 .SlotManager
2.1. 介绍
SlotManager 是一个接口类,定义了ResourceManager中管理Slot的相关操作.
2.2. 接口清单
名称 | 描述 |
---|---|
start | 启动 |
suspend | 挂起 |
int getNumberRegisteredSlots() | 获取注册slot数量 |
int getNumberRegisteredSlotsOf(InstanceID instanceId) | 根据InstanceID获取slot数量 |
int getNumberFreeSlots() | 获取空闲slot数量 |
int getNumberFreeSlotsOf(InstanceID instanceId) | 根据InstanceID获取空闲slot数量 |
Map<WorkerResourceSpec, Integer> getRequiredResources(); | 获取从{@link ResourceActions}请求的尚未完成的workers SlotManager的数量。 |
ResourceProfile getRegisteredResource(); | 获取注册的Resource |
ResourceProfile getRegisteredResourceOf(InstanceID instanceID); | 根据InstanceId 获取注册的Resource |
ResourceProfile getFreeResource(); | 获取空闲Resource |
ResourceProfile getFreeResourceOf(InstanceID instanceID); | 根据InstanceID获取空闲的Resource |
int getNumberPendingSlotRequests(); | 挂起的SlotRequests数量 |
void processResourceRequirements(ResourceRequirements resourceRequirements); | 通知slot manager关于 job需要的资源信息 |
boolean registerSlotRequest(SlotRequest slotRequest) | 发送slot请求. |
boolean unregisterSlotRequest(AllocationID allocationId) | 取消发送slot请求. |
registerTaskManager | 注册TaskManager |
unregisterTaskManager | 取消注册TaskManager |
boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport); | 报告 SlotStatus |
void freeSlot(SlotID slotId, AllocationID allocationId); | 释放Slot |
void setFailUnfulfillableRequest(boolean failUnfulfillableRequest); | 设置失败未实现的请求. |
三 .SlotManagerImpl
SlotManagerImpl是SlotManager接口的实现类.
3.1. 属性清单
/**
* 所有当前可用slot的索引。
* Map for all registered slots. */
private final HashMap<SlotID, TaskManagerSlot> slots;
/** Index of all currently free slots. */
private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
/**
* 所有当前注册的任务管理器。
* All currently registered task managers. */
private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;
/**
* 用于请求重复数据消除的已完成和活动分配的映射。
* Map of fulfilled and active allocations for request deduplication purposes.
* */
private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
/**
* 挂起/未完成的slot分配请求的映射
* Map of pending/unfulfilled slot allocation requests. */
private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;
/** Scheduled executor for timeouts. */
private final ScheduledExecutor scheduledExecutor;
/** Timeout for slot requests to the task manager. */
private final Time taskManagerRequestTimeout;
/** Timeout after which an allocation is discarded. */
private final Time slotRequestTimeout;
/** Timeout after which an unused TaskManager is released. */
private final Time taskManagerTimeout;
private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots;
private final SlotMatchingStrategy slotMatchingStrategy;
/** ResourceManager's id. */
private ResourceManagerId resourceManagerId;
/** Executor for future callbacks which have to be "synchronized". */
private Executor mainThreadExecutor;
/** Callbacks for resource (de-)allocations. */
private ResourceActions resourceActions;
private ScheduledFuture<?> taskManagerTimeoutsAndRedundancyCheck;
private ScheduledFuture<?> slotRequestTimeoutCheck;
/** True iff the component has been started. */
private boolean started;
/**
* Release task executor only when each produced result partition is either consumed or failed.
*/
private final boolean waitResultConsumedBeforeRelease;
/** Defines the max limitation of the total number of slots. */
private final int maxSlotNum;
/** Defines the number of redundant taskmanagers. */
private final int redundantTaskManagerNum;
/**
* If true, fail unfulfillable slot requests immediately. Otherwise, allow unfulfillable request
* to pend. A slot request is considered unfulfillable if it cannot be fulfilled by neither a
* slot that is already registered (including allocated ones) nor a pending slot that the {@link
* ResourceActions} can allocate.
*/
private boolean failUnfulfillableRequest = true;
/** The default resource spec of workers to request. */
private final WorkerResourceSpec defaultWorkerResourceSpec;
private final int numSlotsPerWorker;
private final ResourceProfile defaultSlotResourceProfile;
private final SlotManagerMetricGroup slotManagerMetricGroup;
3.2. 构造方法
构造方法就是普通的赋值&初始化的过程…
public SlotManagerImpl(
ScheduledExecutor scheduledExecutor,
SlotManagerConfiguration slotManagerConfiguration,
SlotManagerMetricGroup slotManagerMetricGroup) {
this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
Preconditions.checkNotNull(slotManagerConfiguration);
this.slotMatchingStrategy = slotManagerConfiguration.getSlotMatchingStrategy();
this.taskManagerRequestTimeout = slotManagerConfiguration.getTaskManagerRequestTimeout();
this.slotRequestTimeout = slotManagerConfiguration.getSlotRequestTimeout();
this.taskManagerTimeout = slotManagerConfiguration.getTaskManagerTimeout();
this.waitResultConsumedBeforeRelease =
slotManagerConfiguration.isWaitResultConsumedBeforeRelease();
this.defaultWorkerResourceSpec = slotManagerConfiguration.getDefaultWorkerResourceSpec();
this.numSlotsPerWorker = slotManagerConfiguration.getNumSlotsPerWorker();
this.defaultSlotResourceProfile =
generateDefaultSlotResourceProfile(defaultWorkerResourceSpec, numSlotsPerWorker);
this.slotManagerMetricGroup = Preconditions.checkNotNull(slotManagerMetricGroup);
this.maxSlotNum = slotManagerConfiguration.getMaxSlotNum();
this.redundantTaskManagerNum = slotManagerConfiguration.getRedundantTaskManagerNum();
slots = new HashMap<>(16);
freeSlots = new LinkedHashMap<>(16);
taskManagerRegistrations = new HashMap<>(4);
fulfilledSlotRequests = new HashMap<>(16);
pendingSlotRequests = new HashMap<>(16);
pendingSlots = new HashMap<>(16);
resourceManagerId = null;
resourceActions = null;
mainThreadExecutor = null;
taskManagerTimeoutsAndRedundancyCheck = null;
slotRequestTimeoutCheck = null;
started = false;
}
3.3. 方法详单
3.3.1. start
启动
/**
* Starts the slot manager with the given leader id and resource manager actions.
*
* @param newResourceManagerId to use for communication with the task managers
* @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
* @param newResourceActions to use for resource (de-)allocations
*/
@Override
public void start(
ResourceManagerId newResourceManagerId,
Executor newMainThreadExecutor,
ResourceActions newResourceActions) {
LOG.info("Starting the SlotManager.");
this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
resourceActions = Preconditions.checkNotNull(newResourceActions);
started = true;
taskManagerTimeoutsAndRedundancyCheck =
scheduledExecutor.scheduleWithFixedDelay(
() ->
mainThreadExecutor.execute(
() -> checkTaskManagerTimeoutsAndRedundancy()),
0L,
taskManagerTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
slotRequestTimeoutCheck =
scheduledExecutor.scheduleWithFixedDelay(
() -> mainThreadExecutor.execute(() -> checkSlotRequestTimeouts()),
0L,
slotRequestTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
registerSlotManagerMetrics();
}
3.3.2. suspend
挂起
/** Suspends the component. This clears the internal state of the slot manager. */
@Override
public void suspend() {
LOG.info("Suspending the SlotManager.");
// stop the timeout checks for the TaskManagers and the SlotRequests
if (taskManagerTimeoutsAndRedundancyCheck != null) {
taskManagerTimeoutsAndRedundancyCheck.cancel(false);
taskManagerTimeoutsAndRedundancyCheck = null;
}
if (slotRequestTimeoutCheck != null) {
slotRequestTimeoutCheck.cancel(false);
slotRequestTimeoutCheck = null;
}
for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
cancelPendingSlotRequest(pendingSlotRequest);
}
pendingSlotRequests.clear();
ArrayList<InstanceID> registeredTaskManagers =
new ArrayList<>(taskManagerRegistrations.keySet());
for (InstanceID registeredTaskManager : registeredTaskManagers) {
unregisterTaskManager(
registeredTaskManager,
new SlotManagerException("The slot manager is being suspended."));
}
resourceManagerId = null;
resourceActions = null;
started = false;
}
3.3.3. int getNumberRegisteredSlots()
获取注册slot数量
@Override
public int getNumberRegisteredSlots() {
return slots.size();
}
3.3.4. int getNumberRegisteredSlotsOf(InstanceID instanceId)
根据InstanceID获取slot数量
@Override
public int getNumberRegisteredSlotsOf(InstanceID instanceId) {
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
if (taskManagerRegistration != null) {
return taskManagerRegistration.getNumberRegisteredSlots();
} else {
return 0;
}
}
3.3.5. int getNumberFreeSlots()
获取空闲slot数量
@Override
public int getNumberFreeSlots() {
return freeSlots.size();
}
3.3.6. int getNumberFreeSlotsOf(InstanceID instanceId)
根据InstanceID获取空闲slot数量
@Override
public int getNumberFreeSlotsOf(InstanceID instanceId) {
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
if (taskManagerRegistration != null) {
return taskManagerRegistration.getNumberFreeSlots();
} else {
return 0;
}
}
3.3.7. Map<WorkerResourceSpec, Integer> getRequiredResources();
获取从{@link ResourceActions}请求的尚未完成的workers SlotManager的数量。
@Override
public Map<WorkerResourceSpec, Integer> getRequiredResources() {
final int pendingWorkerNum =
MathUtils.divideRoundUp(pendingSlots.size(), numSlotsPerWorker);
return pendingWorkerNum > 0
? Collections.singletonMap(defaultWorkerResourceSpec, pendingWorkerNum)
: Collections.emptyMap();
}
3.3.8. ResourceProfile getRegisteredResource();
获取注册的Resource
@Override
public ResourceProfile getRegisteredResource() {
return getResourceFromNumSlots(getNumberRegisteredSlots());
}
3.3.9. ResourceProfile getRegisteredResourceOf(InstanceID instanceID);
根据InstanceId 获取注册的Resource
@Override
public ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {
return getResourceFromNumSlots(getNumberRegisteredSlotsOf(instanceID));
}
3.3.10. ResourceProfile getFreeResource();
获取空闲Resource
@Override
public ResourceProfile getFreeResource() {
return getResourceFromNumSlots(getNumberFreeSlots());
}
3.3.11. ResourceProfile getFreeResourceOf(InstanceID instanceID);
根据InstanceID获取空闲的Resource
@Override
public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
return getResourceFromNumSlots(getNumberFreeSlotsOf(instanceID));
}
3.3.12. int getNumberPendingSlotRequests();
挂起的SlotRequests数量
@Override
public ResourceProfile getFreeResource() {
return getResourceFromNumSlots(getNumberFreeSlots());
}
3.3.13. void processResourceRequirements(ResourceRequirements resourceRequirements);
通知slot manager关于 job需要的资源信息
@Override
public void processResourceRequirements(ResourceRequirements resourceRequirements) {
// no-op; don't throw an UnsupportedOperationException here because there are code paths
// where the resource
// manager calls this method regardless of whether declarative resource management is used
// or not
}
3.3.14. boolean registerSlotRequest(SlotRequest slotRequest)
发送slot请求.
/**
* Requests a slot with the respective resource profile.
*
* @param slotRequest specifying the requested slot specs
* @return true if the slot request was registered; false if the request is a duplicate
* @throws ResourceManagerException if the slot request failed (e.g. not enough resources left)
*/
@Override
public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
checkInit();
if (checkDuplicateRequest(slotRequest.getAllocationId())) {
LOG.debug(
"Ignoring a duplicate slot request with allocation id {}.",
slotRequest.getAllocationId());
return false;
} else {
//构建请求
PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
try {
// 请求操作
internalRequestSlot(pendingSlotRequest);
} catch (ResourceManagerException e) {
// requesting the slot failed --> remove pending slot request
pendingSlotRequests.remove(slotRequest.getAllocationId());
throw new ResourceManagerException(
"Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
}
return true;
}
}
/**
* Tries to allocate a slot for the given slot request. If there is no slot available, the
* resource manager is informed to allocate more resources and a timeout for the request is
* registered.
*
* @param pendingSlotRequest to allocate a slot for
* @throws ResourceManagerException if the slot request failed or is unfulfillable
*/
private void internalRequestSlot(PendingSlotRequest pendingSlotRequest)
throws ResourceManagerException {
// 获取配置...
final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
OptionalConsumer.of(findMatchingSlot(resourceProfile))
.ifPresent(taskManagerSlot -> {
// taskManagerSlot 存在操作
allocateSlot(taskManagerSlot, pendingSlotRequest);
})
.ifNotPresent(() -> {
// taskManagerSlot 不存在操作 ==> 启动 TaskManager
fulfillPendingSlotRequestWithPendingTaskManagerSlot( pendingSlotRequest);
});
}
3.3.15. boolean unregisterSlotRequest(AllocationID allocationId)
取消发送slot请求.
/**
* Cancels and removes a pending slot request with the given allocation id. If there is no such
* pending request, then nothing is done.
*
* @param allocationId identifying the pending slot request
* @return True if a pending slot request was found; otherwise false
*/
@Override
public boolean unregisterSlotRequest(AllocationID allocationId) {
checkInit();
PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);
if (null != pendingSlotRequest) {
LOG.debug("Cancel slot request {}.", allocationId);
cancelPendingSlotRequest(pendingSlotRequest);
return true;
} else {
LOG.debug(
"No pending slot request with allocation id {} found. Ignoring unregistration request.",
allocationId);
return false;
}
}
3.3.16. registerTaskManager
注册TaskManager
/**
*
* 在 slot manager中注册一个新的task manager
*
* 从而是 task managers slots 可以被感知/调度
*
* Registers a new task manager at the slot manager.
* This will make the task managers slots known and, thus, available for allocation.
*
* @param taskExecutorConnection for the new task manager
* @param initialSlotReport for the new task manager
* @return True if the task manager has not been registered before and is registered
* successfully; otherwise false
*/
@Override
public boolean registerTaskManager(
final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
// 初始化检查
// The slot manager has not been started.
checkInit();
LOG.debug(
"Registering TaskManager {} under {} at the SlotManager.",
taskExecutorConnection.getResourceID().getStringWithMetadata(),
taskExecutorConnection.getInstanceID());
// 我们通过任务管理器的实例id来识别它们
// we identify task managers by their instance id
if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
// 之间已经连接过, 直接报搞slot的状态.
reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
return false;
} else {
if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
// 是否查过最大的 slot 数量...
LOG.infoFlinkFlink 1.12.2 TaskSlot
FlinkFlink 1.12.2 TaskSlotTable