FlinkFlink 1.12.2 SlotManager
Posted 九师兄
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 1.12.2 SlotManager相关的知识,希望对你有一定的参考价值。
转载:Flink 1.12.2 源码浅析 :SlotManager
SlotManager 负责维护所有已注册的任务管理器slot、它们的分配和所有挂起的slot请求的视图。
无论何时注册新slot或释放分配的slot,它都会尝试执行另一个挂起的slot请求。 每当没有足够的可用slot时,slot管理器将通过{@link ResourceActions#allocateResource(WorkerResourceSpec)}通知资源管理器。
二 .SlotManager
2.1. 介绍
SlotManager 是一个接口类,定义了ResourceManager中管理Slot的相关操作.
2.2. 接口清单
名称 | 描述 |
start | 启动 |
suspend | 挂起 |
int getNumberRegisteredSlots() | 获取注册slot数量 |
int getNumberRegisteredSlotsOf(InstanceID instanceId) | 根据InstanceID获取slot数量 |
int getNumberFreeSlots() | 获取空闲slot数量 |
int getNumberFreeSlotsOf(InstanceID instanceId) | 根据InstanceID获取空闲slot数量 |
Map<WorkerResourceSpec, Integer> getRequiredResources(); | 获取从{@link ResourceActions}请求的尚未完成的workers SlotManager的数量。 |
ResourceProfile getRegisteredResource(); | 获取注册的Resource |
ResourceProfile getRegisteredResourceOf(InstanceID instanceID); | 根据InstanceId 获取注册的Resource |
ResourceProfile getFreeResource(); | 获取空闲Resource |
ResourceProfile getFreeResourceOf(InstanceID instanceID); | 根据InstanceID获取空闲的Resource |
int getNumberPendingSlotRequests(); | 挂起的SlotRequests数量 |
void processResourceRequirements(ResourceRequirements resourceRequirements); | 通知slot manager关于 job需要的资源信息 |
boolean registerSlotRequest(SlotRequest slotRequest) | 发送slot请求. |
boolean unregisterSlotRequest(AllocationID allocationId) | 取消发送slot请求. |
registerTaskManager | 注册TaskManager |
unregisterTaskManager | 取消注册TaskManager |
boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport); | 报告 SlotStatus |
void freeSlot(SlotID slotId, AllocationID allocationId); | 释放Slot |
void setFailUnfulfillableRequest(boolean failUnfulfillableRequest); | 设置失败未实现的请求. |
三 .SlotManagerImpl
3.1. 属性清单
* 所有当前可用slot的索引。
* Map for all registered slots. */
private final HashMap<SlotID, TaskManagerSlot> slots;
/** Index of all currently free slots. */
private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
* 所有当前注册的任务管理器。
* All currently registered task managers. */
private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;
* 用于请求重复数据消除的已完成和活动分配的映射。
* Map of fulfilled and active allocations for request deduplication purposes.
* */
private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
* 挂起/未完成的slot分配请求的映射
* Map of pending/unfulfilled slot allocation requests. */
private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;
/** Scheduled executor for timeouts. */
private final ScheduledExecutor scheduledExecutor;
/** Timeout for slot requests to the task manager. */
private final Time taskManagerRequestTimeout;
/** Timeout after which an allocation is discarded. */
private final Time slotRequestTimeout;
/** Timeout after which an unused TaskManager is released. */
private final Time taskManagerTimeout;
private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots;
private final SlotMatchingStrategy slotMatchingStrategy;
/** ResourceManager's id. */
private ResourceManagerId resourceManagerId;
/** Executor for future callbacks which have to be "synchronized". */
private Executor mainThreadExecutor;
/** Callbacks for resource (de-)allocations. */
private ResourceActions resourceActions;
private ScheduledFuture<?> taskManagerTimeoutsAndRedundancyCheck;
private ScheduledFuture<?> slotRequestTimeoutCheck;
/** True iff the component has been started. */
private boolean started;
* Release task executor only when each produced result partition is either consumed or failed.
private final boolean waitResultConsumedBeforeRelease;
/** Defines the max limitation of the total number of slots. */
private final int maxSlotNum;
/** Defines the number of redundant taskmanagers. */
private final int redundantTaskManagerNum;
* If true, fail unfulfillable slot requests immediately. Otherwise, allow unfulfillable request
* to pend. A slot request is considered unfulfillable if it cannot be fulfilled by neither a
* slot that is already registered (including allocated ones) nor a pending slot that the {@link
* ResourceActions} can allocate.
private boolean failUnfulfillableRequest = true;
/** The default resource spec of workers to request. */
private final WorkerResourceSpec defaultWorkerResourceSpec;
private final int numSlotsPerWorker;
private final ResourceProfile defaultSlotResourceProfile;
private final SlotManagerMetricGroup slotManagerMetricGroup;
3.2. 构造方法
public SlotManagerImpl(
ScheduledExecutor scheduledExecutor,
SlotManagerConfiguration slotManagerConfiguration,
SlotManagerMetricGroup slotManagerMetricGroup) {
this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
this.slotMatchingStrategy = slotManagerConfiguration.getSlotMatchingStrategy();
this.taskManagerRequestTimeout = slotManagerConfiguration.getTaskManagerRequestTimeout();
this.slotRequestTimeout = slotManagerConfiguration.getSlotRequestTimeout();
this.taskManagerTimeout = slotManagerConfiguration.getTaskManagerTimeout();
this.waitResultConsumedBeforeRelease =
this.defaultWorkerResourceSpec = slotManagerConfiguration.getDefaultWorkerResourceSpec();
this.numSlotsPerWorker = slotManagerConfiguration.getNumSlotsPerWorker();
this.defaultSlotResourceProfile =
generateDefaultSlotResourceProfile(defaultWorkerResourceSpec, numSlotsPerWorker);
this.slotManagerMetricGroup = Preconditions.checkNotNull(slotManagerMetricGroup);
this.maxSlotNum = slotManagerConfiguration.getMaxSlotNum();
this.redundantTaskManagerNum = slotManagerConfiguration.getRedundantTaskManagerNum();
slots = new HashMap<>(16);
freeSlots = new LinkedHashMap<>(16);
taskManagerRegistrations = new HashMap<>(4);
fulfilledSlotRequests = new HashMap<>(16);
pendingSlotRequests = new HashMap<>(16);
pendingSlots = new HashMap<>(16);
resourceManagerId = null;
resourceActions = null;
mainThreadExecutor = null;
taskManagerTimeoutsAndRedundancyCheck = null;
slotRequestTimeoutCheck = null;
started = false;
3.3. 方法详单
3.3.1. start
* Starts the slot manager with the given leader id and resource manager actions.
* @param newResourceManagerId to use for communication with the task managers
* @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
* @param newResourceActions to use for resource (de-)allocations
public void start(
ResourceManagerId newResourceManagerId,
Executor newMainThreadExecutor,
ResourceActions newResourceActions) {
LOG.info("Starting the SlotManager.");
this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
resourceActions = Preconditions.checkNotNull(newResourceActions);
started = true;
taskManagerTimeoutsAndRedundancyCheck =
() ->
() -> checkTaskManagerTimeoutsAndRedundancy()),
slotRequestTimeoutCheck =
() -> mainThreadExecutor.execute(() -> checkSlotRequestTimeouts()),
3.3.2. suspend
/** Suspends the component. This clears the internal state of the slot manager. */
public void suspend() {
LOG.info("Suspending the SlotManager.");
// stop the timeout checks for the TaskManagers and the SlotRequests
if (taskManagerTimeoutsAndRedundancyCheck != null) {
taskManagerTimeoutsAndRedundancyCheck = null;
if (slotRequestTimeoutCheck != null) {
slotRequestTimeoutCheck = null;
for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
ArrayList<InstanceID> registeredTaskManagers =
new ArrayList<>(taskManagerRegistrations.keySet());
for (InstanceID registeredTaskManager : registeredTaskManagers) {
new SlotManagerException("The slot manager is being suspended."));
resourceManagerId = null;
resourceActions = null;
started = false;
3.3.3. int getNumberRegisteredSlots()
public int getNumberRegisteredSlots() {
return slots.size();
3.3.4. int getNumberRegisteredSlotsOf(InstanceID instanceId)
public int getNumberRegisteredSlotsOf(InstanceID instanceId) {
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
if (taskManagerRegistration != null) {
return taskManagerRegistration.getNumberRegisteredSlots();
} else {
return 0;
3.3.5. int getNumberFreeSlots()
public int getNumberFreeSlots() {
return freeSlots.size();
3.3.6. int getNumberFreeSlotsOf(InstanceID instanceId)
public int getNumberFreeSlotsOf(InstanceID instanceId) {
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
if (taskManagerRegistration != null) {
return taskManagerRegistration.getNumberFreeSlots();
} else {
return 0;
3.3.7. Map<WorkerResourceSpec, Integer> getRequiredResources();
获取从{@link ResourceActions}请求的尚未完成的workers SlotManager的数量。
public Map<WorkerResourceSpec, Integer> getRequiredResources() {
final int pendingWorkerNum =
MathUtils.divideRoundUp(pendingSlots.size(), numSlotsPerWorker);
return pendingWorkerNum > 0
? Collections.singletonMap(defaultWorkerResourceSpec, pendingWorkerNum)
: Collections.emptyMap();
3.3.8. ResourceProfile getRegisteredResource();
public ResourceProfile getRegisteredResource() {
return getResourceFromNumSlots(getNumberRegisteredSlots());
3.3.9. ResourceProfile getRegisteredResourceOf(InstanceID instanceID);
根据InstanceId 获取注册的Resource
public ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {
return getResourceFromNumSlots(getNumberRegisteredSlotsOf(instanceID));
3.3.10. ResourceProfile getFreeResource();
public ResourceProfile getFreeResource() {
return getResourceFromNumSlots(getNumberFreeSlots());
3.3.11. ResourceProfile getFreeResourceOf(InstanceID instanceID);
public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
return getResourceFromNumSlots(getNumberFreeSlotsOf(instanceID));
3.3.12. int getNumberPendingSlotRequests();
public ResourceProfile getFreeResource() {
return getResourceFromNumSlots(getNumberFreeSlots());
3.3.13. void processResourceRequirements(ResourceRequirements resourceRequirements);
通知slot manager关于 job需要的资源信息
public void processResourceRequirements(ResourceRequirements resourceRequirements) {
// no-op; don't throw an UnsupportedOperationException here because there are code paths
// where the resource
// manager calls this method regardless of whether declarative resource management is used
// or not
3.3.14. boolean registerSlotRequest(SlotRequest slotRequest)
* Requests a slot with the respective resource profile.
* @param slotRequest specifying the requested slot specs
* @return true if the slot request was registered; false if the request is a duplicate
* @throws ResourceManagerException if the slot request failed (e.g. not enough resources left)
public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
if (checkDuplicateRequest(slotRequest.getAllocationId())) {
"Ignoring a duplicate slot request with allocation id {}.",
return false;
} else {
PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
try {
// 请求操作
} catch (ResourceManagerException e) {
// requesting the slot failed --> remove pending slot request
throw new ResourceManagerException(
"Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
return true;
* Tries to allocate a slot for the given slot request. If there is no slot available, the
* resource manager is informed to allocate more resources and a timeout for the request is
* registered.
* @param pendingSlotRequest to allocate a slot for
* @throws ResourceManagerException if the slot request failed or is unfulfillable
private void internalRequestSlot(PendingSlotRequest pendingSlotRequest)
throws ResourceManagerException {
// 获取配置...
final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
.ifPresent(taskManagerSlot -> {
// taskManagerSlot 存在操作
allocateSlot(taskManagerSlot, pendingSlotRequest);
.ifNotPresent(() -> {
// taskManagerSlot 不存在操作 ==> 启动 TaskManager
fulfillPendingSlotRequestWithPendingTaskManagerSlot( pendingSlotRequest);
3.3.15. boolean unregisterSlotRequest(AllocationID allocationId)
* Cancels and removes a pending slot request with the given allocation id. If there is no such
* pending request, then nothing is done.
* @param allocationId identifying the pending slot request
* @return True if a pending slot request was found; otherwise false
public boolean unregisterSlotRequest(AllocationID allocationId) {
PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);
if (null != pendingSlotRequest) {
LOG.debug("Cancel slot request {}.", allocationId);
return true;
} else {
"No pending slot request with allocation id {} found. Ignoring unregistration request.",
return false;
3.3.16. registerTaskManager
* 在 slot manager中注册一个新的task manager
* 从而是 task managers slots 可以被感知/调度
* Registers a new task manager at the slot manager.
* This will make the task managers slots known and, thus, available for allocation.
* @param taskExecutorConnection for the new task manager
* @param initialSlotReport for the new task manager
* @return True if the task manager has not been registered before and is registered
* successfully; otherwise false
public boolean registerTaskManager(
final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
// 初始化检查
// The slot manager has not been started.
"Registering TaskManager {} under {} at the SlotManager.",
// 我们通过任务管理器的实例id来识别它们
// we identify task managers by their instance id
if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
// 之间已经连接过, 直接报搞slot的状态.
reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
return false;
} else {
if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
// 是否查过最大的 slot 数量...
LOG.infoFlinkFlink 1.12.2 TaskSlot
FlinkFlink 1.12.2 TaskSlotTable