FlinkFlink 1.12.2 源码浅析 : TaskExecutor
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 1.12.2 源码浅析 : TaskExecutor相关的知识,希望对你有一定的参考价值。
1.概述
转载:Flink 1.12.2 源码浅析 : TaskExecutor
TaskExecutor 是TaskManger的具体实现.
二 .TaskExecutorGateway
TaskExecutor 是TaskManager的具体实现, 首先看网关都实现了什么逻辑. 清单如下
2.1. 类图
2.2. 接口清单
名称 | 描述 |
---|---|
CompletableFuture requestSlot( SlotID slotId, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile, String targetAddress, ResourceManagerId resourceManagerId, @RpcTimeout Time timeout) | 从TaskManager请求slot |
requestTaskBackPressure | 获取任务背压相关信息 |
submitTask( TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, @RpcTimeout Time timeout) | [核心]提交任务 |
updatePartitions | 更改任务的 分区 |
releaseOrPromotePartitions | 批量发布/升级中间结果分区。 |
releaseClusterPartitions | 释放属于任何给定数据集的所有群集分区 |
triggerCheckpoint | **触发给定任务的checkpoint。**checkpoint由checkpointID和checkpoint时间戳标识。 |
confirmCheckpoint | 确认给定任务的checkpoint。 checkpoint由checkpointID和checkpoint时间戳标识。 |
abortCheckpoint | 终止Checkpoint |
cancelTask | 取消任务 |
heartbeatFromJobManager | JobManager心跳请求 |
heartbeatFromResourceManager | ResourceManager心跳请求 |
disconnectJobManager | 断开给定JobManager与TaskManager的连接。 |
disconnectResourceManager | 建立给定ResourceManager与TaskManager的连接。 |
freeSlot | 释放slot |
requestFileUploadByType | 请求将指定类型的文件上载到集群的{@link BlobServer}。 |
requestFileUploadByName | 请求将指定名称的文件上载到集群的{@link BlobServer}。 |
requestMetricQueryServiceAddress | 返回TaskManager上度量查询服务的网关。 |
canBeReleased | 检查是否可以释放任务执行器。如果有未使用的结果分区,则不能释放它。 |
requestLogList | 请求TaskManager上的历史日志文件名。 |
sendOperatorEventToTask | 向Task发送Operator Event |
requestThreadDump | 请求TaskManager 的thread dump 信息 |
三 .代码浅析
3.1. 属性
3.1.1. 服务相关
// HA
/** The access to the leader election and retrieval services. */
private final HighAvailabilityServices haServices;
// TaskExecutor 相关的服务比如: MemoryManager , IOManager ,ShuffleEnvironment 等等
private final TaskManagerServices taskExecutorServices;
/**
* The task manager configuration.
* */
private final TaskManagerConfiguration taskManagerConfiguration;
/** The fatal error handler to use in case of a fatal error. */
private final FatalErrorHandler fatalErrorHandler;
// BLOB缓存提供对永久和临时BLOB的BLOB服务的访问。
private final BlobCacheService blobCacheService;
private final LibraryCacheManager libraryCacheManager;
/** The address to metric query service on this Task Manager. */
@Nullable private final String metricQueryServiceAddress;
3.1.2. TaskManager相关服务
/**
* 此任务管理器的连接信息。
* The connection information of this task manager. */
private final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation;
private final TaskManagerMetricGroup taskManagerMetricGroup;
/**
* 此任务的状态管理器,为每个插槽提供状态管理器。
* The state manager for this task, providing state managers per slot. */
private final TaskExecutorLocalStateStoresManager localStateStoresManager;
/** Information provider for external resources. */
private final ExternalResourceInfoProvider externalResourceInfoProvider;
/** The network component in the task manager. */
private final ShuffleEnvironment<?, ?> shuffleEnvironment;
/** The kvState registration service in the task manager. */
private final KvStateService kvStateService;
private final Executor ioExecutor;
3.1.3. 任务slot分配表
private final TaskSlotTable<Task> taskSlotTable;
private final JobTable jobTable;
private final JobLeaderService jobLeaderService;
private final LeaderRetrievalService resourceManagerLeaderRetriever;
3.1.4. resource manager 相关
// resource manager 相关
@Nullable private ResourceManagerAddress resourceManagerAddress;
@Nullable private EstablishedResourceManagerConnection establishedResourceManagerConnection;
@Nullable private TaskExecutorToResourceManagerConnection resourceManagerConnection;
@Nullable private UUID currentRegistrationTimeoutId;
private Map<JobID, Collection<CompletableFuture<ExecutionState>>>
taskResultPartitionCleanupFuturesPerJob = new HashMap<>(8);
3.1.5. 其他
// 硬件描述信息
private final HardwareDescription hardwareDescription;
// 内存配置信息
private final TaskExecutorMemoryConfiguration memoryConfiguration;
// 文件缓存
private FileCache fileCache;
// jobManager 心跳相关
/** The heartbeat manager for job manager in the task manager. */
private final HeartbeatManager<AllocatedSlotReport, TaskExecutorToJobManagerHeartbeatPayload>
jobManagerHeartbeatManager;
// resource manager 心跳相关
/** The heartbeat manager for resource manager in the task manager. */
private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload>
resourceManagerHeartbeatManager;
// 分区相关
private final TaskExecutorPartitionTracker partitionTracker;
// 背压相关
private final BackPressureSampleService backPressureSampleService;
3.2. 核心方法
3.2.1. requestSlot
ResourceManager中的SlotManager的调用requestSlot接口向TaskExecutor请求slot .
org.apache.flink.runtime.taskexecutor.TaskExecutor#requestSlot
@Override
public CompletableFuture<Acknowledge> requestSlot(
final SlotID slotId,
final JobID jobId,
final AllocationID allocationId,
final ResourceProfile resourceProfile,
final String targetAddress,
final ResourceManagerId resourceManagerId,
final Time timeout) {
// TODO: Filter invalid requests from the resource manager by using the
// instance/registration Id
// 输出日志信息
// Receive slot request
// 3755cb8f9962a9a7738db04f2a02084c
// for job
// 694474d11da6100e82744c9e47e2f511
// from resource manager with leader id
// 00000000000000000000000000000000.
log.info(
"Receive slot request {} for job {} from resource manager with leader id {}.",
allocationId,
jobId,
resourceManagerId);
// 是否连接到 ResourceManager
if (!isConnectedToResourceManager(resourceManagerId)) {
final String message =
String.format(
"TaskManager is not connected to the resource manager %s.",
resourceManagerId);
log.debug(message);
return FutureUtils.completedExceptionally(new TaskManagerException(message));
}
try {
//[重点] 分配 slot
allocateSlot(slotId, jobId, allocationId, resourceProfile);
} catch (SlotAllocationException sae) {
return FutureUtils.completedExceptionally(sae);
}
final JobTable.Job job;
try {
// 获取/构建 JobTable.Job
job =jobTable.getOrCreateJob(jobId, () -> registerNewJobAndCreateServices(jobId, targetAddress));
} catch (Exception e) {
// free the allocated slot
try {
taskSlotTable.freeSlot(allocationId);
} catch (SlotNotFoundException slotNotFoundException) {
// slot no longer existent, this should actually never happen, because we've
// just allocated the slot. So let's fail hard in this case!
onFatalError(slotNotFoundException);
}
// release local state under the allocation id.
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
// sanity check
if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
onFatalError(new Exception("Could not free slot " + slotId));
}
return FutureUtils.completedExceptionally(
new SlotAllocationException("Could not create new job.", e));
}
if (job.isConnected()) {
//[重要] 向JobManager提供Slot
offerSlotsToJobManager(jobId);
}
return CompletableFuture.completedFuture(Acknowledge.get());
}
org.apache.flink.runtime.taskexecutor.TaskExecutor#allocateSlot(slotId, jobId, allocationId, resourceProfile);
private void allocateSlot(
SlotID slotId, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile)
throws SlotAllocationException {
// slotId = {SlotID@6055} "container_1619273419318_0032_01_000002_0"
// resourceId = {ResourceID@6114} "container_1619273419318_0032_01_000002"
// slotNumber = 0
// jobId = {JobID@6056} "05fdf1bc744b274be1525c918c1ad378"
// allocationId = {AllocationID@6057} "a9ce7abc6f1d6f264dbdce5564efcb76"
// resourceProfile = {ResourceProfile@6058} "ResourceProfile{UNKNOWN}"
// cpuCores = null
// taskHeapMemory = null
// taskOffHeapMemory = null
// managedMemory = null
// networkMemory = null
// extendedResources = {HashMap@6116} size = 0
// taskSlotTable = {TaskSlotTableImpl@6077}
// numberSlots = 4
// defaultSlotResourceProfile = {ResourceProfile@6124} "ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=96.000mb (100663293 bytes), taskOffHeapMemory=0 bytes, managedMemory=128.000mb (134217730 bytes), networkMemory=32.000mb (33554432 bytes)}"
// cpuCores = {CPUResource@6139} "Resource(CPU: 1.0000000000000000)"
// taskHeapMemory = {MemorySize@6140} "100663293 bytes"
// taskOffHeapMemory = {MemorySize@6141} "0 bytes"
// managedMemory = {MemorySize@6142} "134217730 bytes"
// networkMemory = {MemorySize@6143} "32 mb"
// extendedResources = {HashMap@6144} size = 0
// memoryPageSize = 32768
// timerService = {TimerService@6125}
// taskSlots = {HashMap@6126} size = 0
// allocatedSlots = {HashMap@6127} size = 0
// taskSlotMappings = {HashMap@6128} size = 0
// slotsPerJob = {HashMap@6129} size = 0
// slotActions = {TaskExecutor$SlotActionsImpl@6130}
// state = {TaskSlotTableImpl$State@6131} "RUNNING"
// budgetManager = {ResourceBudgetManager@6132}
// closingFuture = {CompletableFuture@6133} "java.util.concurrent.CompletableFuture@9a6e076[Not completed]"
// mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@6096}
// memoryVerificationExecutor = {ThreadPoolExecutor@6076} "java.util.concurrent.ThreadPoolExecutor@da5c1a9[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]"
// if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
// 进行分配操作..
// TaskSlotTableImpl # allocateSlot
if (taskSlotTable.allocateSlot(
slotId.getSlotNumber(),
jobId,
allocationId,
resourceProfile,
taskManagerConfiguration.getTimeout())) {
// Allocated slot for 3755cb8f9962a9a7738db04f2a02084c.
log.info("Allocated slot for {}.", allocationId);
} else {
log.info("Could not allocate slot for {}.", allocationId);
throw new SlotAllocationException("Could not allocate slot.");
}
} else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
final String message =
"The slot " + slotId + " has already been allocated for a different job.";
log.info(message);
final AllocationID allocationID =
taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
throw new SlotOccupiedException(
message, allocationID, taskSlotTable.getOwningJob(allocationID));
}
}
org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager(jobId);
// ------------------------------------------------------------------------
// Internal job manager connection methods
// ------------------------------------------------------------------------
private void offerSlotsToJobManager(final JobID jobId) {
// 向JobManager提供Slot : internalOfferSlotsToJobManager
jobTable.getConnection(jobId).ifPresent(this::internalOfferSlotsToJobManager);
}
private void internalOfferSlotsToJobManager(JobTable.Connection jobManagerConnection) {
// 获取JobID
final JobID jobId = jobManagerConnection.getJobId();
// JobID是否已经分配
if (taskSlotTable.hasAllocatedSlots(jobId)) {
// Offer reserved slots to the leader of job 694474d11da6100e82744c9e47e2f511.
log.info("Offer reserved slots to the leader of job {}.", jobId);
// 获取JobMaster 的 Gateway
final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
// 获取 分配给jobId 的所有 TaskSlot
final Iterator<TaskSlot<Task>> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
// 获取 JobMasterId
final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();
// 保留的Slot
final Collection<SlotOffer> reservedSlots = new HashSet<>(2);
while (reservedSlotsIterator.hasNext()) {
SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
reservedSlots.add(offer);
}
// offerSlots
// Offers the given slots to the job manager.
// The response contains the set of accepted slots.
// JobMaster#offerSlots
CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture =
jobMasterGateway.offerSlots(
getResourceID(), reservedSlots, taskManagerConfiguration.getTimeout());
// 异步操作. 处理响应请求,处理异常 || 标记为 slot 状态为active
acceptedSlotsFuture.whenCompleteAsync(
handleAcceptedSlotOffers(jobId, jobMasterGateway, jobMasterId, reservedSlots),
getMainThreadExecutor());
} else {
log.debug("There are no unassigned slots for the job {}.", jobId);
}
}
3.2.2. freeSlot
Frees the slot with the given allocation ID.
@Override
public CompletableFuture<Acknowledge> freeSlot(
AllocationID allocationId, Throwable cause, Time timeout) {
freeSlotInternal(allocationId, cause);
return CompletableFuture.completedFuture(Acknowledge.get());
}
private void freeSlotInternal(AllocationID allocationId, Throwable cause) {
checkNotNull(allocationId);
log.debug("Free slot with allocation id {} because: {}", allocationId, cause.getMessage());
try {
final JobID jobId = taskSlotTable.getOwningJob(allocationId);
// 获取slot 索引的下标.
final int slotIndex = taskSlotTable.freeSlot(allocationId, cause);
if (slotIndex != -1) {
if (isConnectedToResourceManager()) {
// 获取ResourceManager
// the slot was freed. Tell the RM about it
ResourceManagerGateway resourceManagerGateway =
establishedResourceManagerConnection.getResourceManagerGateway();
// 通知RM slot释放.
resourceManagerGateway.notifySlotAvailable(
establishedResourceManagerConnection.getTaskExecutorRegistrationId(),
new SlotID(getResourceID(), slotIndex),
allocationId);
}
if (jobId != null) {
closeJobManagerConnectionIfNoAllocatedResources(jobId);
}
}
} catch (SlotNotFoundException e) {
log.debug("Could not free slot for allocation id {}.", allocationId, e);
}
// 本地存储清空
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
}
3.2.3. submitTask
核心的就是构造一个Task, 然后交由线程Thread执行.
// ----------------------------------------------------------------------
// Task lifecycle RPCs
// 提交 任务 !!!
// ----------------------------------------------------------------------
@Override
public CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
try {
final JobID jobId = tdd.getJobId();
final ExecutionAttemptID executionAttemptID = tdd.getExecutionAttemptId();
final JobTable.Connection jobManagerConnection =
jobTable.getConnection(jobId)
.orElseThrow(
() -> {
final String message =
"Could not submit task because there is no JobManager "
+ "associated for the job "
+ jobId
+ '.';
log.debug(message);
return new TaskSubmissionException(message);
});
if (!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) {
final String message =
"Rejecting the task submission because the job manager leader id "
+ jobMasterId
+ " does not match the expected job manager leader id "
+ jobManagerConnection.getJobMasterId()
+ '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
if (!taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) {
final String message =
"No task slot allocated for job ID "
+ jobId
+ " and allocation ID "
+ tdd.getAllocationId()
+ '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
// re-integrate offloaded data:
try {
tdd.loadBigData(blobCacheService.getPermanentBlobService());
} catch (IOException | ClassNotFoundException e) {以上是关于FlinkFlink 1.12.2 源码浅析 : TaskExecutor的主要内容,如果未能解决你的问题,请参考以下文章
flinkFlink 1.12.2 源码浅析 : Task 浅析
flinkFlink 1.12.2 源码浅析 : Task数据输入
flinkFlink 1.12.2 源码浅析 :Task数据输出
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint