FlinkFlink 源码之ExecutionGraph
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 源码之ExecutionGraph相关的知识,希望对你有一定的参考价值。
1.概述
以前的一个老文章基于 Flink 1.9版本的,现在是基于flink 1.13版本的。
参考:95-230-028-源码-WordCount走读-获取ExecutionGraph
本文转载:Flink源码分析系列文档目录
2.从JobGraph到ExecutionGraph
JobGraph通过Dispatcher.submitJob方法提交。这是后续流程的入口方法。该方法调用了Dispatcher.internalSubmitJob,然后是Dispatcher.persistAndRunJob。
Dispatcher.persistAndRunJob方法存储并执行作业。如下所示:
private void persistAndRunJob(JobGraph jobGraph) throws Exception
jobGraphWriter.putJobGraph(jobGraph);
runJob(jobGraph, ExecutionType.SUBMISSION);
Dispatcher.runJob接收JobGraph和执行类型两个参数。执行类型有两种:提交任务(SUBMISSION)和恢复任务(RECOVERY)。
private void runJob(JobGraph jobGraph, ExecutionType executionType)
// 确保JobID对应的这个作业目前不在运行状态,避免重复提交
Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
// 获取启动时时间戳
long initializationTimestamp = System.currentTimeMillis();
// 这里将JobManagerRunner创建出来
// JobManagerRunner接下来会构造出JobManager
CompletableFuture<JobManagerRunner> jobManagerRunnerFuture =
createJobManagerRunner(jobGraph, initializationTimestamp);
// 包装JobGraph相关信息供Dispatcher使用
DispatcherJob dispatcherJob =
DispatcherJob.createFor(
jobManagerRunnerFuture,
jobGraph.getJobID(),
jobGraph.getName(),
initializationTimestamp);
// 将当前作业的ID加入runningJob集合
// 表示当前作业已处于运行状态
runningJobs.put(jobGraph.getJobID(), dispatcherJob);
final JobID jobId = jobGraph.getJobID();
// 处理Job派发结果
final CompletableFuture<CleanupJobState> cleanupJobStateFuture =
dispatcherJob
.getResultFuture()
.handleAsync(
(dispatcherJobResult, throwable) ->
Preconditions.checkState(
runningJobs.get(jobId) == dispatcherJob,
"The job entry in runningJobs must be bound to the lifetime of the DispatcherJob.");
if (dispatcherJobResult != null)
return handleDispatcherJobResult(
jobId, dispatcherJobResult, executionType);
else
return dispatcherJobFailed(jobId, throwable);
,
getMainThreadExecutor());
// 作业停止的时候,将JobID从runningJob中移除
final CompletableFuture<Void> jobTerminationFuture =
cleanupJobStateFuture
.thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState))
.thenCompose(Function.identity());
// 将作业ID和对应的作业停止future加入到dispatcherJobTerminationFutures集合维护
FutureUtils.assertNoException(jobTerminationFuture);
registerDispatcherJobTerminationFuture(jobId, jobTerminationFuture);
接下来是Dispatcher.createJobManagerRunner方法。
JobManager在Dispatcher中被创建出来,然后启动。创建JobManager的逻辑在createJobManagerRunner方法中,如下所示:
CompletableFuture<JobManagerRunner> createJobManagerRunner(
JobGraph jobGraph, long initializationTimestamp)
final RpcService rpcService = getRpcService();
return CompletableFuture.supplyAsync(
() ->
try
// 使用工厂类创建JobManager
// 传入了JobGraph和高可用服务
JobManagerRunner runner =
jobManagerRunnerFactory.createJobManagerRunner(
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(
jobManagerMetricGroup),
fatalErrorHandler,
initializationTimestamp);
// 启动JobManager
// 实际上为启动JobManager的leader选举服务,选出JM主节点
runner.start();
return runner;
catch (Exception e)
throw new CompletionException(
new JobInitializationException(
jobGraph.getJobID(),
"Could not instantiate JobManager.",
e));
,
ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on
// JobManager creation
此时,JobManager开始进行leader竞选活动。为了确保JobManager不存在单点故障问题,Flink设计了JobManager 高可用,可以同时运行多个JobManager实例。在Standalone部署方式中,JobManager的竞选通过Zookeeper来实现。Yarn集群模式下则通过Yarn的ApplicationMaster失败后自动重启动方式来确保JobManager的高可用。有关leader选举的内容请参见Flink 源码之leader选举(Zookeeper方式)。
一旦leader JM被选举出来,选举服务会调用对应JM的grantLeadership方法。该方法内容如下所示:
@Override
public void grantLeadership(final UUID leaderSessionID)
synchronized (lock)
if (shutdown)
log.debug(
"JobManagerRunner cannot be granted leadership because it is already shut down.");
return;
leadershipOperation =
leadershipOperation.thenRun(
ThrowingRunnable.unchecked(
() ->
synchronized (lock)
// 主要逻辑是这个
// 检查作业调度状态并启动JobManager
verifyJobSchedulingStatusAndStartJobManager(
leaderSessionID);
));
handleException(leadershipOperation, "Could not start the job manager.");
接着我们跟踪到verifyJobSchedulingStatusAndStartJobManager方法。
@GuardedBy("lock")
private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId)
throws FlinkException
// 如果JobManager已停止,直接返回
if (shutdown)
log.debug("Ignoring starting JobMaster because JobManagerRunner is already shut down.");
return;
// 从JobRegistry中获取Job调度状态
final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus =
getJobSchedulingStatus();
// 如果作业已执行完毕
// 调用作业执行完毕逻辑(实际上是作业未被当前JobManager完成运行的逻辑)
if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE)
jobAlreadyDone();
else
// 启动JobMaster
startJobMaster(leaderSessionId);
现在逻辑流转到了JobManagerRunnerImpl.startJobMaster方法。
该方法启动JobMaster。注册JobGraph,启动JobMaster服务并确认该JobMaster为leader。
@GuardedBy("lock")
private void startJobMaster(UUID leaderSessionId) throws FlinkException
log.info(
"JobManager runner for job () was granted leadership with session id .",
jobGraph.getName(),
jobGraph.getJobID(),
leaderSessionId);
try
// 注册JobGraph
// 根据集群部署形式(Standalone,Zookeeper或K8s),采用不同的方式存储JobID
runningJobsRegistry.setJobRunning(jobGraph.getJobID());
catch (IOException e)
throw new FlinkException(
String.format(
"Failed to set the job %s to running in the running jobs registry.",
jobGraph.getJobID()),
e);
// 启动JobMaster服务
startJobMasterServiceSafely(leaderSessionId);
// 确认该JobMaster是leader状态
if (jobMasterService != null)
confirmLeaderSessionIdIfStillLeader(jobMasterService, leaderSessionId);
JobManagerRunnerImpl.startJobMasterServiceSafely紧接着通过
DefaultJobMasterServiceFactory.createJobMasterService方法,创建出JobMaster并启动他的Rpc通信服务。
接下来。在JobMaster构造函数中存在构建Flink作业任务调度器的逻辑。JobMaster.createScheduler方法调用了
DefaultSlotPoolServiceSchedulerFactory.createScheduler创建Flink的调度器。该方法又调用了Scheduler工厂类的创建Scheduler实例这个方法DefaultSchedulerFactory.createInstance。
接下来的流程到了DefaultScheduler中。DefaultScheduler是Flink作业调度器的默认实现。它继承了SchedulerBase,SchedulerBase又实现了SchedulerNG接口。
SchedulerBase构造函数中调用了createAndRestoreExecutionGraph方法。
SchedulerBase.createAndRestoreExecutionGraph代码如下所示:
private ExecutionGraph createAndRestoreExecutionGraph(
JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
CompletedCheckpointStore completedCheckpointStore,
CheckpointsCleaner checkpointsCleaner,
CheckpointIDCounter checkpointIdCounter,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp,
ComponentMainThreadExecutor mainThreadExecutor,
JobStatusListener jobStatusListener)
throws Exception
// 创建ExecutionGraph
ExecutionGraph newExecutionGraph =
createExecutionGraph(
currentJobManagerJobMetricGroup,
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
shuffleMaster,
partitionTracker,
executionDeploymentTracker,
initializationTimestamp);
// 获取ExecutionGraph中创建出的CheckpointCoordinator
// 创建CheckpointCoordinator的过程后面章节有说明
final CheckpointCoordinator checkpointCoordinator =
newExecutionGraph.getCheckpointCoordinator();
if (checkpointCoordinator != null)
// check whether we find a valid checkpoint
// 检查是否存在一个最近的checkpoint
if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
new HashSet<>(newExecutionGraph.getAllVertices().values())))
// check whether we can restore from a savepoint
// 如果有,尝试从这个检查点恢复
tryRestoreExecutionGraphFromSavepoint(
newExecutionGraph, jobGraph.getSavepointRestoreSettings());
// 设置任务失败监听器
newExecutionGraph.setInternalTaskFailuresListener(
new UpdateSchedulerNgOnInternalFailuresListener(this));
// 设置作业状态监听器
newExecutionGraph.registerJobStatusListener(jobStatusListener);
// 设置JobMaster的主线程ThreadExecutor
newExecutionGraph.start(mainThreadExecutor);
return newExecutionGraph;
SchedulerBase.createExecutionGraph方法调用DefaultExecutionGraphBuilder,创建出ExecutionGraph。代码如下所示:
private ExecutionGraph createExecutionGraph(
JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
CompletedCheckpointStore completedCheckpointStore,
CheckpointsCleaner checkpointsCleaner,
CheckpointIDCounter checkpointIdCounter,
ShuffleMaster<?> shuffleMaster,
final JobMasterPartitionTracker partitionTracker,
ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp)
throws JobExecutionException, JobException
// 创建Execution部署监听器
ExecutionDeploymentListener executionDeploymentListener =
new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
//创建Execution状态更新监听器
ExecutionStateUpdateListener executionStateUpdateListener =
(execution, newState) ->
if (newState.isTerminal())
executionDeploymentTracker.stopTrackingDeploymentOf(execution);
;
// 创建ExecutionGraph
return DefaultExecutionGraphBuilder.buildGraph(
jobGraph,
jobMasterConfiguration,
futureExecutor,
ioExecutor,
userCodeLoader,
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
rpcTimeout,
currentJobManagerJobMetricGroup,
blobWriter,
log,
shuffleMaster,
partitionTracker,
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType(
jobGraph.getJobType()),
executionDeploymentListener,
executionStateUpdateListener,
initializationTimestamp,
new DefaultVertexAttemptNumberStore());
ExecutionGraph相关概念
ExecutionGraph为Flink作业的物理执行计划。用来协调数据流的分布式执行过程。
和StreamGraph,JobGraph不同的是,ExecutionGraph是在JobManager中生成。
从ExecutionGraph也有顶点(Vertex)的概念,ExecutionGraph中的vertex为ExecutionJobVertex,和JobGraph中的JobVertex对应。从ExecutionGraph到JobGraph的过程中加入了并行度的概念,ExecutionJobVertex包含了与之对应的JobVertex中所有的并行任务。ExecutionJobVertex之中每一个并行的任务由ExecutionVertex代表。也就是说一个ExecutionJobVertex具有多少并行度,它下面就包含多少个ExecutionVertex。
ExecutionVertex可以被执行一次或多次(由于任务恢复,重计算或更新配置)ExecutionVertex的每一次执行都会生成一个Execution对象。Execution负责跟踪ExecutionVertex的任务执行状态变化和资源使用状况。
IntermediateResult和JobGraph中JobVertex的IntermediateDataSet的概念对应,用于表示两个相邻的ExecutionJobVertex之间数据传输过程中的临时存放点。IntermediateResult在ExecutionJobVertex创建的时候被构建出来,数量和该vertex的并行度一致。
DefaultExecutionGraphBuilder的buildGraph方法
public static ExecutionGraph buildGraph(
JobGraph jobGraph,
Configuration jobManagerConfig,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
ClassLoader classLoader,
CompletedCheckpointStore completedCheckpointStore,
CheckpointsCleaner checkpointsCleaner,
CheckpointIDCounter checkpointIdCounter,
Time rpcTimeout,
MetricGroup metrics,
BlobWriter blobWriter,
Logger log,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,
ExecutionDeploymentListener executionDeploymentListener,
ExecutionStateUpdateListener executionStateUpdateListener,
long initializationTimestamp,
VertexAttemptNumberStore vertexAttemptNumberStore)
throws JobExecutionException, JobException
checkNotNull(jobGraph, "job graph cannot be null");
// 获取作业名称和作业ID
final String jobName = jobGraph.getName();
final JobID jobId = jobGraph.getJobID();
// 创建JobInformation
// JobInformation为ExecutionGraph中的job相关配置信息的封装类
final JobInformation jobInformation =
new JobInformation(
jobId,
jobName,
jobGraph.getSerializedExecutionConfig(),
jobGraph.getJobConfiguration(),
jobGraph.getUserJarBlobKeys(),
jobGraph.getClasspaths());
// 获取保留在历史记录中的最大重试次数
final int maxPriorAttemptsHistoryLength =
jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
// 获取IntermediateResultPartition释放策略工厂类
final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =
PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(
jobManagerConfig);
// create a new execution graph, if none exists so far
// 创建ExecutionGraph,后面章节分析
final DefaultExecutionGraph executionGraph;
try
executionGraph =
new DefaultExecutionGraph(
jobInformation,
futureExecutor,
ioExecutor,
rpcTimeout,
maxPriorAttemptsHistoryLength,
classLoader,
blobWriter,
partitionReleaseStrategyFactory,
shuffleMaster,
partitionTracker,
partitionLocationConstraint,
executionDeploymentListener,
executionStateUpdateListener,
initializationTimestamp,
vertexAttemptNumberStore);
catch (IOException e)
throw new JobException("Could not create the ExecutionGraph.", e);
// set the basic properties
try
// 设置json格式的执行计划
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
catch (Throwable t)
log.warn("Cannot create JSON plan for job", t);
// give the graph an empty plan
// 如果根据jobGraph生成json执行计划失败,设置一个空的执行计划
executionGraph.setJsonPlan("");
// initialize the vertices that have a master initialization hook
// file output formats create directories here, input formats create splits
final long initMasterStart = System.nanoTime();
log.info("Running initialization on master for job ().", jobName, jobId);
for (JobVertex vertex : jobGraph.getVertices())
// 获取节点调用的类名,即节点的task
String executableClass = vertex.getInvokableClassName();
// 确保每个节点的调用类必须存在
if (executableClass == null || executableClass.isEmpty())
throw new JobSubmissionException(
jobId,
"The vertex "
+ vertex.getID()
+ " ("
+ vertex.getName()
+ ") has no invokable class.");
try
// 根据不同的节点类型,调用job启动时节点的任务逻辑
vertex.initializeOnMaster(classLoader);
catch (Throwable t)
throw new JobExecutionException(
jobId,
"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(),
t);
log.info(
"Successfully ran initialization on master in ms.",
(System.nanoTime() - initMasterStart) / 1_000_000);
// topologically sort the job vertices and attach the graph to the existing one
// 按照拓扑结构(数据流的顺序)排序,获取所有的Job顶点
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
if (log.isDebugEnabled())
log.debug(
"Adding vertices from job graph ().",
sortedTopology.size(),
jobName,
jobId);
// executionGraph绑定所有的Job节点
executionGraph.attachJobGraph(sortedTopology);
if (log.isDebugEnabled())
log.debug(
"Successfully created execution graph from job graph ().", jobName, jobId);
// configure the state checkpointing
// 配置checkpoint
// 如果启用了checkpoint
if (isCheckpointingEnabled(jobGraph))以上是关于FlinkFlink 源码之ExecutionGraph的主要内容,如果未能解决你的问题,请参考以下文章
FlinkFlink 源码之Buffer Debloating