Flink源码剖析之JobGraph的生成
Posted darkness0604
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink源码剖析之JobGraph的生成相关的知识,希望对你有一定的参考价值。
背景
其实,以前编写flink代码的时候,模式比较固定,不管你中间过程如何花里胡哨,开头总要有一个获取上下文环境,最后总要有一个:env.execute(),之前只是知道这个方法,会在job提交的时候,用于获取job的计算流程图,但是我一直很好奇,到底是什么时机生成的呢?
源码剖析
创建JobGraph
我们知道,提交任务的时候,是要有一步获取Job的JobGraph的:
/**
* Creates a @link JobGraph with a specified @link JobID from the given @link
* PackagedProgram.
*
* @param packagedProgram to extract the JobGraph from
* @param configuration to use for the optimizer and job graph generator
* @param defaultParallelism for the JobGraph
* @param jobID the pre-generated job id
* @return JobGraph extracted from the PackagedProgram
* @throws ProgramInvocationException if the JobGraph generation failed
*/
public static JobGraph createJobGraph(
PackagedProgram packagedProgram,
Configuration configuration,
int defaultParallelism,
@Nullable JobID jobID,
boolean suppressOutput)
throws ProgramInvocationException
final Pipeline pipeline =
getPipelineFromProgram(
packagedProgram, configuration, defaultParallelism, suppressOutput);
final JobGraph jobGraph =
FlinkPipelineTranslationUtil.getJobGraphUnderUserClassLoader(
packagedProgram.getUserCodeClassLoader(),
pipeline,
configuration,
defaultParallelism);
if (jobID != null)
jobGraph.setJobID(jobID);
jobGraph.addJars(packagedProgram.getJobJarAndDependencies());
jobGraph.setClasspaths(packagedProgram.getClasspaths());
jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
return jobGraph;
我们可以看到,首先要生成Pipeline, 这样一个单纯的用于表示数据操作链路的结构, 然后再配合上配置+并行度等这样比较具体一些的东西,就叫JobGraph,所以我们主要看一下Pipeline是怎么生成的:
创建Pipeline
public static Pipeline getPipelineFromProgram(
PackagedProgram program,
Configuration configuration,
int parallelism,
boolean suppressOutput)
throws CompilerException, ProgramInvocationException
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader());
final PrintStream originalOut = System.out;
final PrintStream originalErr = System.err;
final ByteArrayOutputStream stdOutBuffer;
final ByteArrayOutputStream stdErrBuffer;
if (suppressOutput)
// temporarily write STDERR and STDOUT to a byte array.
stdOutBuffer = new ByteArrayOutputStream();
System.setOut(new PrintStream(stdOutBuffer));
stdErrBuffer = new ByteArrayOutputStream();
System.setErr(new PrintStream(stdErrBuffer));
else
stdOutBuffer = null;
stdErrBuffer = null;
// temporary hack to support the optimizer plan preview
OptimizerPlanEnvironment benv =
new OptimizerPlanEnvironment(
configuration, program.getUserCodeClassLoader(), parallelism);
benv.setAsContext();
//我们先只关注作为流式任务的StreamPlanEnvironment
StreamPlanEnvironment senv =
new StreamPlanEnvironment(
configuration, program.getUserCodeClassLoader(), parallelism);
//设置上下文
senv.setAsContext();
try
//反射去调用用户的自定义jar包中,指定class的main方法
program.invokeInteractiveModeForExecution();
catch (Throwable t)
if (benv.getPipeline() != null)
return benv.getPipeline();
//问题: 这个pipeline是怎么拿到的? 为什么反射调用了user code jar,就可以得到了?请看下文!!
if (senv.getPipeline() != null)
return senv.getPipeline();
if (t instanceof ProgramInvocationException)
throw t;
throw generateException(
program, "The program caused an error: ", t, stdOutBuffer, stdErrBuffer);
finally
benv.unsetAsContext();
senv.unsetAsContext();
if (suppressOutput)
System.setOut(originalOut);
System.setErr(originalErr);
Thread.currentThread().setContextClassLoader(contextClassLoader);
throw generateException(
program,
"The program plan could not be fetched - the program aborted pre-maturely.",
null,
stdOutBuffer,
stdErrBuffer);
上下文设置
我们可以看到,在去调用我们的用户自定义jar的main方法之前,首先进行了对StreamPlanEnvironment的上下文设置:
//我们先只关注作为流式任务的StreamPlanEnvironment
StreamPlanEnvironment senv =
new StreamPlanEnvironment(
configuration, program.getUserCodeClassLoader(), parallelism);
//深入这个,看看做了什么
senv.setAsContext();
org.apache.flink.client.program.StreamPlanEnvironment#setAsContext
public void setAsContext()
StreamExecutionEnvironmentFactory factory =
conf ->
this.configure(conf, getUserClassloader());
return this;
;
initializeContextEnvironment(factory);
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#initializeContextEnvironment
//重点注意,这里初始化的属性所在类是:org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx)
contextEnvironmentFactory = ctx;
threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
这里要重点记住,这里设置的两个属性,是为了反射调用用户自定义jar main方法中最后的execute方法去挂钩
反射,用户code被调用
设置完毕后,就会去调用用户的code编写的jar包的main方法了,此时我们先忽视一个job的具体逻辑,只关注2部分:
获取上下文
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
我们跟进一下,看做了什么
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#initializeContextEnvironment
//重点注意,这里初始化的属性所在类是:org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration)
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(factory -> factory.createExecutionEnvironment(configuration))
.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
你发现了什么吗? 这个类,和刚刚设置上下文的类,是一个类,而threadLocalContextEnvironmentFactory, contextEnvironmentFactory,是两个static属性:
/**
* The environment of the context (local by default, cluster if invoked through command line).
*/
private static StreamExecutionEnvironmentFactory contextEnvironmentFactory = null;
/** The ThreadLocal used to store @link StreamExecutionEnvironmentFactory. */
private static final ThreadLocal<StreamExecutionEnvironmentFactory>
threadLocalContextEnvironmentFactory = new ThreadLocal<>();
这能说明啥? 这说明此时我们的用户代码里获取的上下文,正是之前设置的上下文工厂所造出来的,而工厂得到的上下文也就是:StreamPlanEnvironment!!!
OK,这样能不能感觉到,就是因为这样,才能让部署时,对于Pipeline的生成与用户code挂上钩
execute
env.execute();
好,那么既然我们知道了,此时的env其实就是StreamPlanEnvironment,那直接看一下它的实现吧:
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#execute(java.lang.String)
public JobExecutionResult execute(String jobName) throws Exception
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
//获取stream graph,并调用execute
return execute(getStreamGraph(jobName));
继续跟进execute方法:
org.apache.flink.client.program.StreamContextEnvironment#execute
@Override
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception
//继续跟进这个方法:
final JobClient jobClient = executeAsync(streamGraph);
final List<JobListener> jobListeners = getJobListeners();
try
final JobExecutionResult jobExecutionResult = getJobExecutionResult(jobClient);
jobListeners.forEach(
jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
return jobExecutionResult;
catch (Throwable t)
jobListeners.forEach(
jobListener ->
jobListener.onJobExecuted(
null, ExceptionUtils.stripExecutionException(t)));
ExceptionUtils.rethrowException(t);
// never reached, only make javac happy
return null;
org.apache.flink.client.program.StreamContextEnvironment#executeAsync
@Override
public JobClient executeAsync(StreamGraph streamGraph) throws Exception
validateAllowedExecution();
//继续跟进这里
final JobClient jobClient = super.executeAsync(streamGraph);
if (!suppressSysout)
System.out.println("Job has been submitted with JobID " + jobClient.getJobID());
return jobClient;
org.apache.flink.client.program.StreamPlanEnvironment#executeAsync
@Override
public JobClient executeAsync(StreamGraph streamGraph)
//streamGraph 就是pipeline
pipeline = streamGraph;
// do not go on with anything now!
throw new ProgramAbortException();
之前我还很奇怪,明明在部署阶段应该只是获取任务流图而已,可如果按正常流程走,不就把job起起来了吗? 可是时机不对吧,到现在我才发现:原来是在获取了pipeline之后,通过异常的方式提前终止了任务继续提交!! 现在回头看创建pipeline那块:
try
//反射去调用用户的自定义jar包中,指定class的main方法
program.invokeInteractiveModeForExecution();
catch (Throwable t)
if (benv.getPipeline() != null)
return benv.getPipeline();
//问题: 这个pipeline是怎么拿到的? 为什么反射调用了user code jar,就可以得到了?请看下文!!
if (senv.getPipeline() != null)
return senv.getPipeline();
if (t instanceof ProgramInvocationException)
throw t;
throw generateException(
program, "The program caused an error: ", t, stdOutBuffer, stdErrBuffer);
finally
benv.unsetAsContext();
senv.unsetAsContext();
if (suppressOutput)
System.setOut(originalOut);
System.setErr(originalErr);
Thread.currentThread().setContextClassLoader(contextClassLoader);
throw generateException(
program,
"The program plan could not be fetched - the program aborted pre-maturely.",
null,
stdOutBuffer,
stdErrBuffer);
原来早已经埋下了伏笔!! 这里通过异常来完成了流程控制,原来如此!!!
以上是关于Flink源码剖析之JobGraph的生成的主要内容,如果未能解决你的问题,请参考以下文章