FlinkFlink 1.12.2 TaskSlot
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 1.12.2 TaskSlot相关的知识,希望对你有一定的参考价值。
1.概述
转载:Flink 1.12.2 源码浅析 : TaskSlot
属于同一slot的多个{@link TaskSlotPayload tasks}
的容器。
TaskSlot 可以处于以下状态之一:
- 空闲[Free]-slot为空,未分配给作业
- 释放中[Releasing]-slot变空后即将释放。
- 已分配[Allocated]-已为作业分配slot。
- 活动[Active]-slot正由job manager使用, job manager 是分配job 的负责人
-
只有在task slot处于空闲状态时才能分配它。
-
分配的task slot可以转换为活动状态。
-
活动slot允许从相应作业添加具有正确分配id的任务。
-
活动slot可以标记为非活动,从而将状态设置回已分配状态。
-
分配的或活动的slot只有在为空时才能释放。
-
如果它不是空的,那么它的状态可以设置为releasing,表示一旦它变空就可以被释放。
二 . 属性
/**
* task slot的下标索引
* Index of the task slot.
* */
private final int index;
/**
* 此插槽的资源特征。
* Resource characteristics for this slot.
* */
private final ResourceProfile resourceProfile;
/**
* 在这个slot中运行的task
* Tasks running in this slot.
* */
private final Map<ExecutionAttemptID, T> tasks;
/**
* 内存管理
*/
private final MemoryManager memoryManager;
/**
* slot的状态:
* 1. ACTIVE, // Slot is in active use by a job manager responsible for a job
* 2. ALLOCATED, // Slot has been allocated for a job but not yet given to a job manager
* 3. RELEASING // Slot is not empty but tasks are failed. Upon removal of all tasks, it will be released
*
* State of this slot. */
private TaskSlotState state;
/**
* 已分配插槽的 job id。
* Job id to which the slot has been allocated.
* */
private final JobID jobId;
/**
* 此插槽的Allocation id。
* Allocation id of this slot.
* */
private final AllocationID allocationId;
/**
* 当插槽被释放和关闭时,关闭操作完成。
* The closing future is completed when the slot is freed and closed.
* */
private final CompletableFuture<Void> closingFuture;
/**
* {@link Executor}用于后台操作,例如验证所有已释放的托管内存
* {@link Executor} for background actions, e.g. verify all managed memory released.
* */
private final Executor asyncExecutor;
Slot的状态
ACTIVE : Slot 已经被 job manager 使用
ALLOCATED : Slot 已经被分配,但是尚未分配Job manager使用.
RELEASING : Slot 不为空,但task失败. 在移除所有任务后,它将被释放 .
三. 方法
3.1. 任务相关
3.1.1. 获取标识
/**
* Generate the slot offer from this TaskSlot.
*
* @return The sot offer which this task slot can provide
*/
public SlotOffer generateSlotOffer() {
Preconditions.checkState(
TaskSlotState.ACTIVE == state || TaskSlotState.ALLOCATED == state,
"The task slot is not in state active or allocated.");
Preconditions.checkState(allocationId != null, "The task slot are not allocated");
return new SlotOffer(allocationId, index, resourceProfile);
}
3.1.2. 添加任务
将给定任务添加到任务slot。
仅当还没有另一个具有相同执行尝试ID的任务添加到任务slot时,才有可能。
在这种情况下,该方法返回true。
否则,任务slot将保持不变,并返回false。
如果任务slot状态未激活,则会抛出{@link IllegalStateException}。
如果任务的作业ID和分配ID与为其分配了任务slot的作业ID和分配ID不匹配,则会抛出{@link IllegalArgumentException}。
/**
* 将给定任务添加到任务slot。
* 仅当还没有另一个具有相同执行尝试ID的任务添加到任务slot时,才有可能。
* 在这种情况下,该方法返回true。
* 否则,任务slot将保持不变,并返回false。
*
* 如果任务slot状态未激活,则会抛出{@link IllegalStateException}。
*
* 如果任务的作业ID和分配ID与为其分配了任务slot的作业ID和分配ID不匹配,则会抛出{@link IllegalArgumentException}。
*
* Add the given task to the task slot. This is only possible if there is not already another
* task with the same execution attempt id added to the task slot. In this case, the method
* returns true. Otherwise the task slot is left unchanged and false is returned.
*
* <p>In case that the task slot state is not active an {@link IllegalStateException} is thrown.
* In case that the task's job id and allocation id don't match with the job id and allocation
* id for which the task slot has been allocated, an {@link IllegalArgumentException} is thrown.
*
* @param task to be added to the task slot
* @throws IllegalStateException if the task slot is not in state active
* @return true if the task was added to the task slot; otherwise false
*/
public boolean add(T task) {
// Check that this slot has been assigned to the job sending this task
Preconditions.checkArgument(
task.getJobID().equals(jobId),
"The task's job id does not match the "
+ "job id for which the slot has been allocated.");
Preconditions.checkArgument(
task.getAllocationId().equals(allocationId),
"The task's allocation "
+ "id does not match the allocation id for which the slot has been allocated.");
Preconditions.checkState(
TaskSlotState.ACTIVE == state, "The task slot is not in state active.");
T oldTask = tasks.put(task.getExecutionId(), task);
if (oldTask != null) {
tasks.put(task.getExecutionId(), oldTask);
return false;
} else {
return true;
}
}
3.1.3. 获取任务
/**
* Get all tasks running in this task slot.
*
* @return Iterator to all currently contained tasks in this task slot.
*/
public Iterator<T> getTasks() {
return tasks.values().iterator();
}
3.1.4. 移除任务
删除由给定的execution attempt id标识的任务。
/**
* Remove the task identified by the given execution attempt id.
*
* @param executionAttemptId identifying the task to be removed
* @return The removed task if there was any; otherwise null.
*/
public T remove(ExecutionAttemptID executionAttemptId) {
return tasks.remove(executionAttemptId);
}
3.1.5. 清理所有task
/** Removes all tasks from this task slot. */
public void clear() {
tasks.clear();
}
3.2. 状态相关
名称 | 描述 |
---|---|
isActive | 状态是否为ACTIVE |
isAllocated | 状态为ACTIVE或者ALLOCATED |
isReleasing | 状态是否为RELEASING |
markActive | 标记状态为ACTIVE |
markInactive | 标记状态为ALLOCATED |
closeAsync | 关闭,标记状态为RELEASING |
3.3. get/set相关
index: int
resourceProfile: ResourceProfile
memoryManager: MemoryManager
state: TaskSlotState
jobId: JobID
allocationId: AllocationID
empty: boolean
releasing: boolean
tasks: Iterator<T>
以上是关于FlinkFlink 1.12.2 TaskSlot的主要内容,如果未能解决你的问题,请参考以下文章
FlinkFlink 1.12.2 TaskSlotTable