FlinkFlink 1.12.2 Task的调度 源码
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 1.12.2 Task的调度 源码相关的知识,希望对你有一定的参考价值。
1.概述
一 .前言
在生成ExecutionGraph之后, Flink就可以根据ExecutionGraph生成具体的Task , 调度到TaskManager上开始执行.
二 .概念相关
调度器是 Flink 作业执行的核心组件,管理作业执行的所有相关过程,包括 JobGraph 到ExecutionGraph 的转换、作业生命周期管理(作业的发布、取消、停止)、作业的 Task 生命周期管理(Task 的发布、取消、停止)、资源申请与释放、作业和 Task 的 Failover 等。
调度有几个重要的组件:
- 调度器: SchedulerNG 及其子类、实现类
- 调度策略: SchedulingStrategy 及其实现类
- 调度模式: ScheduleMode 包含流和批的调度,有各自不同的调度模式
2.1. 调度器
调度器作用:
1)作业的生命周期管理,如作业的发布、挂起、取消
2)作业执行资源的申请、分配、释放
3)作业的状态管理,作业发布过程中的状态变化和作业异常时的 FailOver 等
4)作业的信息提供,对外提供作业的详细信息
实现类: DefaultScheduler
2.2. 调度策略
SchedulingStrategy是一个接口, 里面定义了四个方法:
名称 | 描述 |
---|---|
void startScheduling(); | 调度入口,触发调度器的调度行为 |
void restartTasks(Set verticesToRestart); | 重启执行失败的 Task,一般是 Task 执行异常导致 |
void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState); | 当 Execution 改变状态时调用 |
void onPartitionConsumable(IntermediateResultPartitionID resultPartitionId); | 当 IntermediateResultPartition 中的数据可以消费时调用 |
SchedulingStrategy有三种实现:
EagerSchedulingStrategy
: 适用于流计算,同时调度所有的 taskLazyFromSourcesSchedulingStrategy
:适用于批处理,当输入数据准备好时(上游处理完)进行 vertices 调度。PipelinedRegionSchedulingStrategy
:以流水线的局部为粒度进行调度
PipelinedRegionSchedulingStrategy 是 1.11
加入的,从 1.12
开始,将以 pipelined region
为单位进行调度。
pipelined region
是一组流水线连接的任务。这意味着,对于包含多个 region的流作业,在开始部署任务之前,它不再等待所有任务获取 slot。取而代之的是,一旦任何region
获得了足够的任务 slot
就可以部署它。对于批处理作业,将不会为任务分配 slot,也不会单独部署任务。取而代之的是,一旦某个 region 获得了足够的 slot,则该任务将与所有其他任务一起部署在同一区域中
三 .代码浅析
3.1. 代码调用顺序
-> org.apache.flink.runtime.jobmaster#startJobExecution
-> org.apache.flink.runtime.jobmaster#resetAndStartScheduler
-> org.apache.flink.runtime.jobmaster#startScheduling
-> org.apache.flink.runtime.scheduler.DefaultScheduler#startSchedulingInternal
-> org.apache.flink.runtime.scheduler.strategy#startScheduling
-> org.apache.flink.runtime.scheduler.strategy#maybeScheduleRegions
-> org.apache.flink.runtime.scheduler.DefaultScheduler#allocateSlotsAndDeploy
-> org.apache.flink.runtime.scheduler.DefaultScheduler#waitForAllSlotsAndDeploy
-> org.apache.flink.runtime.scheduler.DefaultScheduler#deployAll
-> org.apache.flink.runtime.scheduler.DefaultScheduler#deployOrHandleError
-> org.apache.flink.runtime.scheduler.DefaultScheduler#deployTaskSafe
-> org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations#deploy
-> org.apache.flink.runtime.executiongraph.ExecutionVertex#deploy
-> org.apache.flink.runtime.executiongraph#deploy
-> org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask
-> org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask
3.2. JobMaster#startJobExecution
// ----------------------------------------------------------------------------------------------
// Internal methods
// ----------------------------------------------------------------------------------------------
// -- job 启动&停止
// -- job starting and stopping
// -----------------------------------------------------------------
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
validateRunsInMainThread();
checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");
if (Objects.equals(getFencingToken(), newJobMasterId)) {
log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);
return Acknowledge.get();
}
setNewFencingToken(newJobMasterId);
// 真正启动 JobMaster
startJobMasterServices();
// Starting execution of job
// Socket Window WordCount (694474d11da6100e82744c9e47e2f511)
// under job master id
// 00000000000000000000000000000000.
log.info(
"Starting execution of job {} ({}) under job master id {}.",
jobGraph.getName(),
jobGraph.getJobID(),
newJobMasterId);
// 重置 & 启动 Scheduler ...
resetAndStartScheduler();
return Acknowledge.get();
}
3.3. JobMaster#resetAndStartScheduler
// 启动Scheduler ??
private void resetAndStartScheduler() throws Exception {
validateRunsInMainThread();
final CompletableFuture<Void> schedulerAssignedFuture;
if (schedulerNG.requestJobStatus() == JobStatus.CREATED) {
schedulerAssignedFuture = CompletableFuture.completedFuture(null);
schedulerNG.setMainThreadExecutor(getMainThreadExecutor());
} else {
suspendAndClearSchedulerFields(
new FlinkException(
"ExecutionGraph is being reset in order to be rescheduled."));
final JobManagerJobMetricGroup newJobManagerJobMetricGroup =
jobMetricGroupFactory.create(jobGraph);
final SchedulerNG newScheduler =
createScheduler(executionDeploymentTracker, newJobManagerJobMetricGroup);
schedulerAssignedFuture =
schedulerNG
.getTerminationFuture()
.handle(
(ignored, throwable) -> {
newScheduler.setMainThreadExecutor(getMainThreadExecutor());
assignScheduler(newScheduler, newJobManagerJobMetricGroup);
return null;
});
}
// 启动调度 startScheduling
FutureUtils.assertNoException(schedulerAssignedFuture.thenRun(this::startScheduling));
}
3.4. JobMaster#startScheduling
private void startScheduling() {
checkState(jobStatusListener == null);
// register self as job status change listener
jobStatusListener = new JobManagerJobStatusListener();
schedulerNG.registerJobStatusListener(jobStatusListener);
// 开始调度 ???
schedulerNG.startScheduling();
}
3.5. DefaultScheduler#startSchedulingInternal
@Override
protected void startSchedulingInternal() {
log.info(
"Starting scheduling with scheduling strategy [{}]",
schedulingStrategy.getClass().getName());
prepareExecutionGraphForNgScheduling();
// 默认调度策略 : PipelinedRegion SchedulingStrategy
// PipelinedRegionSchedulingStrategy#startScheduling
schedulingStrategy.startScheduling();
}
3.6. strategy#startScheduling
@Override
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions =
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
.filter(region -> !region.getConsumedResults().iterator().hasNext())
.collect(Collectors.toSet());
// 这里...
maybeScheduleRegions(sourceRegions);
}
3.7. strategy#maybeScheduleRegions
private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
final List<SchedulingPipelinedRegion> regionsSorted =
SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(
schedulingTopology, regions);
for (SchedulingPipelinedRegion region : regionsSorted) {
// 继续...
maybeScheduleRegion(region);
}
}
private void maybeScheduleRegion(final SchedulingPipelinedRegion region) {
if (!areRegionInputsAllConsumable(region)) {
return;
}
checkState(
areRegionVerticesAllInCreatedState(region),
"BUG: trying to schedule a region which is not in CREATED state");
final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
regionVerticesSorted.get(region), id -> deploymentOption);
// 这里...
// DefaultScheduler # allocateSlotsAndDeploy
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
}
3.8. DefaultScheduler#allocateSlotsAndDeploy
// ------------------------------------------------------------------------
// SchedulerOperations
// ------------------------------------------------------------------------
@Override
public void allocateSlotsAndDeploy(
final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
validateDeploymentOptions(executionVertexDeploymentOptions);
// {ExecutionVertexID@7434} "bc764cd8ddf7a0cff126f51c16239658_0" -> {ExecutionVertexDeploymentOption@7484}
// {ExecutionVertexID@7453} "ea632d67b7d595e5b851708ae9ad79d6_1" -> {ExecutionVertexDeploymentOption@7485}
// {ExecutionVertexID@7445} "0a448493b4782967b150582570326227_2" -> {ExecutionVertexDeploymentOption@7486}
// {ExecutionVertexID@7451} "ea632d67b7d595e5b851708ae9ad79d6_0" -> {ExecutionVertexDeploymentOption@7487}
// {ExecutionVertexID@7448} "0a448493b4782967b150582570326227_3" -> {ExecutionVertexDeploymentOption@7488}
// {ExecutionVertexID@7460} "6d2677a0ecc3fd8df0b72ec675edf8f4_0" -> {ExecutionVertexDeploymentOption@7489}
// {ExecutionVertexID@7457} "ea632d67b7d595e5b851708ae9ad79d6_3" -> {ExecutionVertexDeploymentOption@7490}
// {ExecutionVertexID@7441} "0a448493b4782967b150582570326227_0" -> {ExecutionVertexDeploymentOption@7491}
// {ExecutionVertexID@7455} "ea632d67b7d595e5b851708ae9ad79d6_2" -> {ExecutionVertexDeploymentOption@7492}
// {ExecutionVertexID@7443} "0a448493b4782967b150582570326227_1" -> {ExecutionVertexDeploymentOption@7493}
final Map<ExecutionVertexID, ExecutionVertexDeploymentOption> deploymentOptionsByVertex =
groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions);
// 0 = {ExecutionVertexID@7434} "bc764cd8ddf7a0cff126f51c16239658_0"
// 1 = {ExecutionVertexID@7441} "0a448493b4782967b150582570326227_0"
// 2 = {ExecutionVertexID@7443} "0a448493b4782967b150582570326227_1"
// 3 = {ExecutionVertexID@7445} "0a448493b4782967b150582570326227_2"
// 4 = {ExecutionVertexID@7448} "0a448493b4782967b150582570326227_3"
// 5 = {ExecutionVertexID@7451} "ea632d67b7d595e5b851708ae9ad79d6_0"
// 6 = {ExecutionVertexID@7453} "ea632d67b7d595e5b851708ae9ad79d6_1"
// 7 = {ExecutionVertexID@7455} "ea632d67b7d595e5b851708ae9ad79d6_2"
// 8 = {ExecutionVertexID@7457} "ea632d67b7d595e5b851708ae9ad79d6_3"
// 9 = {ExecutionVertexID@7460} "6d2677a0ecc3fd8df0b72ec675edf8f4_0"
final List<ExecutionVertexID> verticesToDeploy =
executionVertexDeploymentOptions.stream()
.map(ExecutionVertexDeploymentOption::getExecutionVertexId)
.collect(Collectors.toList());
// {ExecutionVertexID@7434} "bc764cd8ddf7a0cff126f51c16239658_0" -> {ExecutionVertexVersion@7566}
// key = {ExecutionVertexID@7434} "bc764cd8ddf7a0cff126f51c16239658_0"
// value = {ExecutionVertexVersion@7566}
// executionVertexId = {ExecutionVertexID@7434} "bc764cd8ddf7a0cff126f51c16239658_0"
// version = 1
// {ExecutionVertexID@7453} "ea632d67b7d595e5b851708ae9ad79d6_1" -> {ExecutionVertexVersion@7567}
// key = {ExecutionVertexID@7453} "ea632d67b7d595e5b851708ae9ad79d6_1"
// value = {ExecutionVertexVersion@7567}
// executionVertexId = {ExecutionVertexID@7453} "ea632d67b7d595e5b851708ae9ad79d6_1"
// version = 1
// {ExecutionVertexID@7445} "0a448493b4782967b150582570326227_2" -> {ExecutionVertexVersion@7568}
// {ExecutionVertexID@7451} "ea632d67b7d595e5b851708ae9ad79d6_0" -> {ExecutionVertexVersion@7569}
// {ExecutionVertexID@7448} "0a448493b4782967b150582570326227_3" -> {ExecutionVertexVersion@7570}
// {ExecutionVertexID@7460} "6d2677a0ecc3fd8df0b72ec675edf8f4_0" -> {ExecutionVertexVersion@7571}
// {ExecutionVertexID@7457} "ea632d67b7d595e5b851708ae9ad79d6_3" -> {ExecutionVertexVersion@7572}
// {ExecutionVertexID@7441} "0a448493b4782967b150582570326227_0" -> {ExecutionVertexVersion@7573}
// {ExecutionVertexID@7455} "ea632d67b7d595e5b851708ae9ad79d6_2" -> {ExecutionVertexVersion@7574}
// {ExecutionVertexID@7443} "0a448493b4782967b150582570326227_1" -> {ExecutionVertexVersion@7575}
final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex =
executionVertexVersioner.recordVertexModifications(verticesToDeploy);
transitionToScheduled(verticesToDeploy);
// allocateSlots ??
// slotExecutionVertexAssignments = {ArrayList@8148} size = 10
// 0 = {SlotExecutionVertexAssignment@8064}
// 1 = {SlotExecutionVertexAssignment@8070}
// 2 = {SlotExecutionVertexAssignment@8072}
// 3 = {SlotExecutionVertexAssignment@8066}
// 4 = {SlotExecutionVertexAssignment@8068}
// 5 = {SlotExecutionVertexAssignment@8067}
// 6 = {SlotExecutionVertexAssignment@8065}
// 7 = {SlotExecutionVertexAssignment@8073}
// 8 = {SlotExecutionVertexAssignment@8071}
// 9 = {SlotExecutionVertexAssignment@8069}
final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
allocateSlots(executionVertexDeploymentOptions);
// DeploymentHandle包含 : 版本, 参数 , slot分配信息
// deploymentHandles = {ArrayList@8194} size = 10
// 0 = {DeploymentHandle@8212}
// requiredVertexVersion = {ExecutionVertexVersion@7566}
// executionVertexDeploymentOption = {ExecutionVertexDeploymentOption@7484}
// slotExecutionVertexAssignment = {SlotExecutionVertexAssignment@8064}
// 1 = {DeploymentHandle@8213}
// requiredVertexVersion = {ExecutionVertexVersion@7573}
// executionVertexDeploymentOption = {ExecutionVertexDeploymentOption@7491}
// slotExecutionVertexAssignment = {SlotExecutionVertexAssignment@8070}
// 2 = {DeploymentHandle@8214}
// 3 = {DeploymentHandle@8215}
// 4 = {DeploymentHandle@8216}
// 5 = {DeploymentHandle@8217}
// 6 = {DeploymentHandle@8218}
// 7 = {DeploymentHandle@8219}
// 8 = {DeploymentHandle@8220}
// 9 = {DeploymentHandle@8221}
final List<DeploymentHandle> deploymentHandles =
createDeploymentHandles(
requiredVersionByVertex,
deploymentOptionsByVertex,
slotExecutionVertexAssignments);
// 开始部署 ...
waitForAllSlotsAndDeploy(deploymentHandles);
}
3.9. DefaultScheduler#waitForAllSlotsAndDeploy
private void waitForAllSlotsAndDeploy(final List<DeploymentHandle> deploymentHandles) {
// 分配资源, 开始部署
FutureUtils.assertNoException(
assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles)));
}
3.10. DefaultScheduler#deployAll
private BiFunction<Void, Throwable, Void> deployAll(
final List<DeploymentHandle> deploymentHandles) {
return (ignored, throwable) -> {
propagateIfNonNull(throwable);
for (final DeploymentHandle deploymentHandle : deploymentHandles) {
final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
deploymentHandle.getSlotExecutionVertexAssignment();
final CompletableFuture<LogicalSlot> slotAssigned =
slotExecutionVertexAssignment.getLogicalSlotFuture();
checkState(slotAssigned.isDone());
// slot 分配任务 : deployOrHandleError
FutureUtils.assertNoException(
slotAssigned.handle(deployOrHandleError(deploymentHandle)));
}
return null;
};
}
3.11. DefaultScheduler#deployOrHandleError
private BiFunction<Object, Throwable, Void> deployOrHandleError(
final DeploymentHandle deploymentHandle) {
final ExecutionVertexVersion requiredVertexVersion =
deploymentHandle.getRequiredVertexVersion();
final ExecutionVertexID executionVertexId = requiredVertexVersion.getExecutionVertexId();
return (ignored, throwable) -> {
if (executionVertexVersioner.isModified(requiredVertexVersion)) {
log.debug(
"Refusing to deploy execution vertex {} because this deployment was "
+ "superseded by another deployment",
executionVertexId);
return null;
}
if (throwable == null) {
// 部署task
deployTaskSafe(executionVertexId);
} else {
handleTaskDeploymentFailure(executionVertexId, throwable);
}
return null;
};
}
3.12. DefaultScheduler#deployTaskSafe
private void deployTaskSafe(final ExecutionVertexID executionVertexId) {
try {
// 获取ExecutionVertex
final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
// 开始部署 : ExecutionVertex
// DefaultExecutionVertexOperations#deploy
executionVertexOperations.deploy(executionVertex);
} catch (Throwable e) {
handleTaskDeploymentFailure(executionVertexId, e);
}
}
3.13. DefaultExecutionVertexOperations#deploy
@Override
public void deploy(final ExecutionVertex executionVertex) throws JobException {
executionVertex.deploy();
}
3.14. ExecutionVertex#deploy
public void deploy() throws JobException {
// 部署
currentExecution.deploy();
}
3.15. ExecutionVertex#deploy
/**
* 将 execution 部署到先前分配的资源。
* Deploys the execution to the previously assigned resource.
*
* @throws JobException if the execution cannot be deployed to the assigned resource
*/
public void deploy() throws JobException {
assertRunningInJobMasterMainThread();
// 获取slot
// slotRequestId = {SlotRequestId@8931} "SlotRequestId{7d3611a3599a124ed703d75c55561420}"
// slotContext = {AllocatedSlot@8932} "AllocatedSlot e5eeb5d0e767c407ea81ab345a14ebd8 @ container_1619273419318_0017_01_000002 @ henghe-030 (dataPort=39722) - 0"
// slotSharingGroupId = null
// locality = {Locality@8933} "UNKNOWN"
// slotOwner = {SharedSlot@8934}
// releaseFuture = {CompletableFuture@8935} "java.util.concurrent.CompletableFuture@7ea60a0f[Not completed]"
// state = {SingleLogicalSlot$State@8936} "ALIVE"
// payload = {Execution@8899} "Attempt #0 (Source: Socket Stream (1/1)) @ org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@7f697d27 - [SCHEDULED]"
// willBeOccupiedIndefinitely = true
final LogicalSlot slot = assignedResource;
checkNotNull(
slot,
"In order 以上是关于FlinkFlink 1.12.2 Task的调度 源码的主要内容,如果未能解决你的问题,请参考以下文章
flinkFlink 1.12.2 源码浅析 : Task数据输入
flinkFlink 1.12.2 源码浅析 :Task数据输出
flinkFlink 1.12.2 源码浅析 : StreamTask 浅析
FlinkFlink 不支持部分 task 结束后进行 checkpoint
FlinkFLink Task did not exit gracefully within 180 + seconds 未解决