FlinkFlink 计算资源管理

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 计算资源管理相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink 源码阅读笔记(6)- 计算资源管理

在 Flink 中,计算资源的是以 Slot 作为基本单位进行分配的。本文将对 Flink 中计算资源的管理机制加以分析。

2.Task Slot 的基本概念

我们在前面的文章中了解了 Flink 集群的启动流程。在 Flink 集群中,每个 TaskManager 都是一个单独的 JVM 进程(非 MiniCluster 模式),并且在一个 TaskManager 中可能运行多个子任务,这些子任务都在各自独立的线程中运行。为了控制一个 TaskManager 中可以运行的任务的数量,引入了 Task Slot 的概念。

每一个 Task Slot 代表了 TaskManager 所拥有的计算资源的一个固定的子集。例如,一个拥有 3 个 slot 的 TaskManager,每个 slot 可以使用 1⁄3 的内存。这样,运行在不同 slot 中的子任务不会竞争内存资源。目前 Flink 还不支持 CPU 的隔离,只支持内存的隔离。

通过调整 slot 的数量,可以控制子任务的隔离程度。例如,如果每个 TaskManager 只有 1 个 slot,么么就以为者每一组子任务都运行在单独的 JVM 进程中;每个 TaskManager 有多个 slot 的话,就意味着可以有更多的子任务运行在同一个 JVM 中。而在同一个 JVM 进程中的子任务,可以共享TCP连接和心跳消息,减少数据的网络传输,也能共享一些数据结构。一定程度上减少了每个子任务的消耗。

默认情况下, Flink 允许子任务共享 slot ,前提是,它们属于同一个 Job 并且不是同一个 operator 的子任务。这样的结果是,在同一个 slit 中可能会运行 Job 的一个完整的 pipeline。允许 Slot 共享有两个主要的好处:

  1. Flink 计算一个 Job 所需的 slot 数量时,只需要确定其最大并行度即可,而不用考虑每一个任务的并行度;
  2. 能更好的利用资源。如果没有 slot 共享,那些资源需求不大的子任务和资源需求大的子任务会占用相同的资源,但一旦允许 slot 共享,它们就可能被分配到同一个 slot 中。

Flink 通过 SlotSharingGroup 和 CoLocationGroup 来确定在调度任务的时候如何进行资源共享,它们俩分别对应两种约束条件:

  • SlotSharingGroup: 相同 SlotSharingGroup 的不同 JobVertex 的子任务可以被分配在同一个 slot 中,但不保证能做到;

  • CoLocationGroup:相同 SlotSharingGroup 的不同 JobVertex ,它们的第 n 个子任务必须保证都在同一个 slot 中,这是一种强制性的约束。

3.TaskExecutor 中 Slot 的管理

3.1 TaskSlot

首先,我们来看下在 TaskManager,也就是 TaskExecutor 中是如何管理 Slot 的。

SlotID 是一个 slot 的唯一标识,它包含两个属性,其中 ResourceID 表明该 slot 所在的TaskExecutor, slotNumber 是该 slot 在 TaskExecutor 中的索引位置。

TaskSlot 是在 TaskExecutor 中对 slot 的抽象,可能处于 Free, Releasing, Allocated, Active 这四种状态之中。它的主要属性如下:

class TaskSlot 
	/** Index of the task slot. */
	private final int index;

	/** State of this slot. */
	private TaskSlotState state;

	/** Resource characteristics for this slot. */
	private final ResourceProfile resourceProfile;

	/** Tasks running in this slot. */
	//在一个 Slot 中可能执行多个 Task
	private final Map<ExecutionAttemptID, Task> tasks;

	/** Job id to which the slot has been allocated; null if not allocated. */
	private JobID jobId;

	/** Allocation id of this slot; null if not allocated. */
	private AllocationID allocationId;

TaskSlot 提供了修改状态的方法,如 allocate(JobID newJobId, AllocationID newAllocationId) 方法会将 slot 标记为 Allocated 状态;markFree() 会将 slot 标记为 Free 状态,但只有在所有 Task 都被移除之后才能释放成功。 slot 在切换状态的时候会先判断它当前所处的状态。另外,可以通过 add(Task task) 向 slot 中添加 Task,需要保证这些 Task 都来自同一个 Job。

3.2 TaskSlotTable

TaskExecutor 主要通过 TaskSlotTable 来管理它所拥有的所有 slot :

class TaskSlotTable implements TimeoutListener<AllocationID> 
/** Timer service used to time out allocated slots. */
	private final TimerService<AllocationID> timerService;

	/** The list of all task slots. */
	private final List<TaskSlot> taskSlots;

	/** Mapping from allocation id to task slot. */
	private final Map<AllocationID, TaskSlot> allocationIDTaskSlotMap;

	/** Mapping from execution attempt id to task and task slot. */
	private final Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;

	/** Mapping from job id to allocated slots for a job. */
	private final Map<JobID, Set<AllocationID>> slotsPerJob;

	/** Interface for slot actions, such as freeing them or timing them out. */
	private SlotActions slotActions;


通过 allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) 方法可以将指定 index 的 slot 分配给 AllocationID 对应的请求,这个方法会调用 TaskSlot.allocate(JobID newJobId, AllocationID newAllocationId) 方法。这里需要注意的是,allocateSlot 方法的最后一个参数是一个超时时间。我们注意到,TaskSlotTable 有一个成员变量是 TimerService timerService,通过 timeService 可以注册定时器,如果定时器在超时时间到达之前没有被取消,那么 SlotAction.timeout 方法就会被调用。如果被分配的 slot 关联的 slot 在超时之前没有被取消,那么该 slot 就会被重新释放,标记为 Free 状态。

class TaskSlotTable 
	public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) 
		checkInit();
		TaskSlot taskSlot = taskSlots.get(index);
		boolean result = taskSlot.allocate(jobId, allocationId);
		if (result) 
			// update the allocation id to task slot map
			allocationIDTaskSlotMap.put(allocationId, taskSlot);
			// register a timeout for this slot since it's in state allocated
			timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
			// add this slot to the set of job slots
			Set<AllocationID> slots = slotsPerJob.get(jobId);
			if (slots == null) 
				slots = new HashSet<>(4);
				slotsPerJob.put(jobId, slots);
			
			slots.add(allocationId);
		
		return result;
	

如果 slot 被标记为 Active,则会取消在分配 slot 的时候关联的定时器:

class TaskSlotTable 
    public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException 
		checkInit();
		TaskSlot taskSlot = getTaskSlot(allocationId);
		if (taskSlot != null) 
			if (taskSlot.markActive()) 
				// unregister a potential timeout
				LOG.info("Activate slot .", allocationId);
				timerService.unregisterTimeout(allocationId);
				return true;
			 else 
				return false;
			
		 else 
			throw new SlotNotFoundException(allocationId);
		
	

通过 createSlotReport 可以获得一个 SlotReport 对象, SlotReport 中包含当前 TaskExecutor 中所有 slot 的状态以及它们的分配情况。

4.TaskExecutor

TaskExecutor 需要向 ResourceManager 报告所有 slot 的状态,这样 ResourceManager 就知道了所有 slot 的分配情况。这主要发生在两种情况之下:

  • TaskExecutor 首次和 ResourceManager 建立连接的时候,需要发送 SlotReport
  • TaskExecutor 和 ResourceManager 定期发送心跳信息,心跳包中包含 SlotReport

我们看下相关的代码逻辑:

class TaskExecutor 
	private void establishResourceManagerConnection(
			ResourceManagerGateway resourceManagerGateway,
			ResourceID resourceManagerResourceId,
			InstanceID taskExecutorRegistrationId,
			ClusterInformation clusterInformation) 
		//首次建立连接,向 RM 报告 slot 信息
		final CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(
			getResourceID(),
			taskExecutorRegistrationId,
			taskSlotTable.createSlotReport(getResourceID()),
			taskManagerConfiguration.getTimeout());

		//.........
	

	private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, SlotReport> 
		//心跳信息
		@Override
		public CompletableFuture<SlotReport> retrievePayload(ResourceID resourceID) 
			return callAsync(
					() -> taskSlotTable.createSlotReport(getResourceID()),
					taskManagerConfiguration.getTimeout());
		
	

ResourceManager 通过 TaskExecutor.requestSlot 方法要求 TaskExecutor 分配 slot,由于 ResourceManager 知道所有 slot 的当前状况,因此分配请求会精确到具体的 SlotID :

class TaskExecutor 
	@Override
	public CompletableFuture<Acknowledge> requestSlot(
		final SlotID slotId,
		final JobID jobId,
		final AllocationID allocationId,
		final String targetAddress,
		final ResourceManagerId resourceManagerId,
		final Time timeout) 
		try 
			//判断发送请求的 RM 是否是当前 TaskExecutor 注册的
			if (!isConnectedToResourceManager(resourceManagerId)) 
				final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId);
				log.debug(message);
				throw new TaskManagerException(message);
			

			if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) 
				//如果 slot 是 Free 状态,则分配 slot
				if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, taskManagerConfiguration.getTimeout())) 
					log.info("Allocated slot for .", allocationId);
				 else 
					log.info("Could not allocate slot for .", allocationId);
					throw new SlotAllocationException("Could not allocate slot.");
				
			 else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) 
				//如果 slot 已经被分配了,则抛出异常
				final String message = "The slot " + slotId + " has already been allocated for a different job.";
				log.info(message);
				final AllocationID allocationID = taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
				throw new SlotOccupiedException(message, allocationID, taskSlotTable.getOwningJob(allocationID));
			

			//将分配的 slot 提供给发送请求的 JobManager
			if (jobManagerTable.contains(jobId)) 
				//如果和对应的 JobManager 已经建立了连接,则向 JobManager 提供 slot
				offerSlotsToJobManager(jobId);
			 else 
				//否则,先和JobManager 建立连接,连接建立后会调用 offerSlotsToJobManager(jobId) 方法
				try 
					jobLeaderService.addJob(jobId, targetAddress);
				 catch (Exception e) 
					// free the allocated slot
					try 
						taskSlotTable.freeSlot(allocationId);
					 catch (SlotNotFoundException slotNotFoundException) 
						// slot no longer existent, this should actually never happen, because we've
						// just allocated the slot. So let's fail hard in this case!
						onFatalError(slotNotFoundException);
					

					// release local state under the allocation id.
					localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
					// sanity check
					if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) 
						onFatalError(new Exception("Could not free slot " + slotId));
					
					throw new SlotAllocationException("Could not add job to job leader service.", e);
				
			
		 catch (TaskManagerException taskManagerException) 
			return FutureUtils.completedExceptionally(taskManagerException);
		

		return CompletableFuture.completedFuture(Acknowledge.get());
	

在 Slot 被分配给之后,TaskExecutor 需要将对应的 slot 提供给 JobManager,而这正是通过 offerSlotsToJobManager(jobId) 方法来实现的:

class TaskExecutor 
	private void offerSlotsToJobManager(final JobID jobId) 
		final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId);
		if (jobManagerConnection == null) 
			log.debug("There is no job manager connection to the leader of job .", jobId);
		 else 
			if (taskSlotTable.hasAllocatedSlots(jobId)) 
				log.info("Offer reserved slots to the leader of job .", jobId);
				final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
				//获取分配给当前 Job 的 slot,这里只会取得状态为 allocated 的 slot
				final Iterator<TaskSlot> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
				final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();
				final Collection<SlotOffer> reservedSlots = new HashSet<>(2);
				while (reservedSlotsIterator.hasNext()) 
					SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
					reservedSlots.add(offer);
				
				//通过 RPC 调用,将slot提供给 JobMaster
				CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
					getResourceID(),
					reservedSlots,
					taskManagerConfiguration.getTimeout());

				acceptedSlotsFuture.whenCompleteAsync(
					(Iterable<SlotOffer> acceptedSlots, Throwable throwable) -> 
						if (throwable != null) 
							//超时,则重试
							if (throwable instanceof TimeoutException) 
								log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering.");
								// We ran into a timeout. Try again.
								offerSlotsToJobManager(jobId);
							 else 
								log.warn("Slot offering to JobManager failed. Freeing the slots " +
									"and returning them to the ResourceManager.", throwable);
								// 发生异常,则释放所有的 slot
								for (SlotOffer reservedSlot: reservedSlots) 
									freeSlotInternal(reservedSlot.getAllocationId(), throwable);
								
							
						 else 
							//调用成功
							// check if the response is still valid
							if (isJobManagerConnectionValid(jobId, jobMasterId)) 
								// mark accepted slots active
								//对于被 JobMaster 确认接受的 slot, 标记为 Active 状态
								for (SlotOffer acceptedSlot : acceptedSlots) 
									try 
										if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) 
											// the slot is either free or releasing at the moment
											final String message = "Could not mark slot " + jobId + " active.";
											log.debug(message);
											jobMasterGateway.failSlot(
												getResourceID(),
												acceptedSlot.getAllocationId(),
												new FlinkException(message));
										
									 catch (SlotNotFoundException e) 
										final String message = "Could not mark slot " + jobId + " active.";
										jobMasterGateway.failSlot(
											getResourceID(),
											acceptedSlot.getAllocationId(),
											new FlinkException(message));
									

									reservedSlots.remove(acceptedSlot);
								

								final Exception e = new Exception("The slot was rejected by the JobManager.");
								//释放剩余没有被接受的 slot
								for (SlotOffer rejectedSlot : reservedSlots) 
									freeSlotInternal(rejectedSlot.getAllocationId(), e);
								
							 else 
								// discard the response since there is a new leader for the job
								log.debug("Discard offer slot response since there is a new leader " +
									"for the job .", jobId);
							
						
					,
					getMainThreadExecutor());
			 else 
				log.debug("There are no unassigned slots for the job .", jobId);
			
		
	

通过 freeSlot(AllocationID, Throwable) 方法,可以请求 TaskExecutor 释放和 AllocationID 关联的 slot:

class TaskExecutor 
	@Override
	public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) 
		freeSlotInternal(allocationId, cause);
		return CompletableFuture.completedFuture(Acknowledge.get());
	

	private void freeSlotInternal(AllocationID allocationId, Throwable cause) 
		checkNotNull(allocationId);
		try 
			final JobID jobId = taskSlotTable.getOwningJob(allocationId);
			//尝试释放 allocationId 绑定的 slot
			final int slotIndex = taskSlotTable.freeSlot(allocationId, cause);

			if (slotIndex != -1) 
				//成功释放 slot
				if (isConnectedToResourceManager()) 
					//告知 ResourceManager 当前 slot 可用
					// the slot was freed. Tell the RM about it
					ResourceManagerGateway resourceManagerGateway = establishedResourceManagerConnection.getResourceManagerGateway();
					resourceManagerGateway.notifySlotAvailable(
						establishedResourceManagerConnection.getTaskExecutorRegistrationId(),
						new SlotID(getResourceID(), slotIndex),
						allocationId);
				
				if (jobId != null) 
					// 如果和 allocationID 绑定的 Job 已经没有分配的 slot 了,那么可以断开和 JobMaster 的连接了
					// check whether we still have allocated slots for the same job
					if (taskSlotTable.getAllocationIdsPerJob(jobId).isEmpty()) 
						// we can remove the job from the job leader service
						try 
							jobLeaderService.removeJob(jobId);
						 catch (Exception e) 
							log.info("Could not remove job  from JobLeaderService.",以上是关于FlinkFlink 计算资源管理的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFlink 流计算 容错 source节点进行数据容错

FLinkFlink checkpoint 实现数据连续计算 恢复机制 案例实战

FlinkFlink 资源相关 Slot SlotPool

FlinkFLink Barrier 在流经算子 做 checkpoint 的时候,数据是停止的吗?

FlinkFlink运行时的架构

FlinkFlink 扩展资源框架