Flink源码剖析之JobGraph的生成

Posted darkness0604

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink源码剖析之JobGraph的生成相关的知识,希望对你有一定的参考价值。

背景

其实,以前编写flink代码的时候,模式比较固定,不管你中间过程如何花里胡哨,开头总要有一个获取上下文环境,最后总要有一个:env.execute(),之前只是知道这个方法,会在job提交的时候,用于获取job的计算流程图,但是我一直很好奇,到底是什么时机生成的呢?

源码剖析

创建JobGraph

我们知道,提交任务的时候,是要有一步获取Job的JobGraph的:

   /**
     * Creates a @link JobGraph with a specified @link JobID from the given @link
     * PackagedProgram.
     *
     * @param packagedProgram to extract the JobGraph from
     * @param configuration to use for the optimizer and job graph generator
     * @param defaultParallelism for the JobGraph
     * @param jobID the pre-generated job id
     * @return JobGraph extracted from the PackagedProgram
     * @throws ProgramInvocationException if the JobGraph generation failed
     */
    public static JobGraph createJobGraph(
            PackagedProgram packagedProgram,
            Configuration configuration,
            int defaultParallelism,
            @Nullable JobID jobID,
            boolean suppressOutput)
            throws ProgramInvocationException 
        final Pipeline pipeline =
                getPipelineFromProgram(
                        packagedProgram, configuration, defaultParallelism, suppressOutput);
        final JobGraph jobGraph =
                FlinkPipelineTranslationUtil.getJobGraphUnderUserClassLoader(
                        packagedProgram.getUserCodeClassLoader(),
                        pipeline,
                        configuration,
                        defaultParallelism);
        if (jobID != null) 
            jobGraph.setJobID(jobID);
        
        jobGraph.addJars(packagedProgram.getJobJarAndDependencies());
        jobGraph.setClasspaths(packagedProgram.getClasspaths());
        jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());

        return jobGraph;
    

我们可以看到,首先要生成Pipeline, 这样一个单纯的用于表示数据操作链路的结构, 然后再配合上配置+并行度等这样比较具体一些的东西,就叫JobGraph,所以我们主要看一下Pipeline是怎么生成的:

创建Pipeline

public static Pipeline getPipelineFromProgram(
            PackagedProgram program,
            Configuration configuration,
            int parallelism,
            boolean suppressOutput)
            throws CompilerException, ProgramInvocationException 
        final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();

        Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader());

        final PrintStream originalOut = System.out;
        final PrintStream originalErr = System.err;
        final ByteArrayOutputStream stdOutBuffer;
        final ByteArrayOutputStream stdErrBuffer;

        if (suppressOutput) 
            // temporarily write STDERR and STDOUT to a byte array.
            stdOutBuffer = new ByteArrayOutputStream();
            System.setOut(new PrintStream(stdOutBuffer));
            stdErrBuffer = new ByteArrayOutputStream();
            System.setErr(new PrintStream(stdErrBuffer));
         else 
            stdOutBuffer = null;
            stdErrBuffer = null;
        

        // temporary hack to support the optimizer plan preview
        OptimizerPlanEnvironment benv =
                new OptimizerPlanEnvironment(
                        configuration, program.getUserCodeClassLoader(), parallelism);
        benv.setAsContext();
    
        //我们先只关注作为流式任务的StreamPlanEnvironment
        StreamPlanEnvironment senv =
                new StreamPlanEnvironment(
                        configuration, program.getUserCodeClassLoader(), parallelism);
    
        //设置上下文
        senv.setAsContext();

        try 
           //反射去调用用户的自定义jar包中,指定class的main方法 
            program.invokeInteractiveModeForExecution();
         catch (Throwable t) 
            if (benv.getPipeline() != null) 
                return benv.getPipeline();
            
            //问题: 这个pipeline是怎么拿到的? 为什么反射调用了user code jar,就可以得到了?请看下文!! 
            if (senv.getPipeline() != null) 
                return senv.getPipeline();
            

            if (t instanceof ProgramInvocationException) 
                throw t;
            

            throw generateException(
                    program, "The program caused an error: ", t, stdOutBuffer, stdErrBuffer);
         finally 
            benv.unsetAsContext();
            senv.unsetAsContext();
            if (suppressOutput) 
                System.setOut(originalOut);
                System.setErr(originalErr);
            
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        

        throw generateException(
                program,
                "The program plan could not be fetched - the program aborted pre-maturely.",
                null,
                stdOutBuffer,
                stdErrBuffer);
    

上下文设置

我们可以看到,在去调用我们的用户自定义jar的main方法之前,首先进行了对StreamPlanEnvironment的上下文设置:

//我们先只关注作为流式任务的StreamPlanEnvironment
StreamPlanEnvironment senv =
        new StreamPlanEnvironment(
                configuration, program.getUserCodeClassLoader(), parallelism);
//深入这个,看看做了什么
senv.setAsContext();
org.apache.flink.client.program.StreamPlanEnvironment#setAsContext
public void setAsContext() 
        StreamExecutionEnvironmentFactory factory =
                conf -> 
                    this.configure(conf, getUserClassloader());
                    return this;
                ;
        initializeContextEnvironment(factory);
    
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#initializeContextEnvironment  
//重点注意,这里初始化的属性所在类是:org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) 
        contextEnvironmentFactory = ctx;
        threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
    

这里要重点记住,这里设置的两个属性,是为了反射调用用户自定义jar main方法中最后的execute方法去挂钩

反射,用户code被调用

设置完毕后,就会去调用用户的code编写的jar包的main方法了,此时我们先忽视一个job的具体逻辑,只关注2部分:

获取上下文

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
我们跟进一下,看做了什么
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#initializeContextEnvironment  
//重点注意,这里初始化的属性所在类是:org.apache.flink.streaming.api.environment.StreamExecutionEnvironment  
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) 
        return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
                .map(factory -> factory.createExecutionEnvironment(configuration))
                .orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
    

你发现了什么吗? 这个类,和刚刚设置上下文的类,是一个类,而threadLocalContextEnvironmentFactory, contextEnvironmentFactory,是两个static属性:

/**
 * The environment of the context (local by default, cluster if invoked through command line).
 */
private static StreamExecutionEnvironmentFactory contextEnvironmentFactory = null;

/** The ThreadLocal used to store @link StreamExecutionEnvironmentFactory. */
private static final ThreadLocal<StreamExecutionEnvironmentFactory>
        threadLocalContextEnvironmentFactory = new ThreadLocal<>();

这能说明啥? 这说明此时我们的用户代码里获取的上下文,正是之前设置的上下文工厂所造出来的,而工厂得到的上下文也就是:StreamPlanEnvironment!!!
OK,这样能不能感觉到,就是因为这样,才能让部署时,对于Pipeline的生成与用户code挂上钩

execute

env.execute();
好,那么既然我们知道了,此时的env其实就是StreamPlanEnvironment,那直接看一下它的实现吧:

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#execute(java.lang.String) 
public JobExecutionResult execute(String jobName) throws Exception 
        Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");

        //获取stream graph,并调用execute
        return execute(getStreamGraph(jobName));
    

继续跟进execute方法:

org.apache.flink.client.program.StreamContextEnvironment#execute

    @Override
    public JobExecutionResult execute(StreamGraph streamGraph) throws Exception 
        //继续跟进这个方法:
        final JobClient jobClient = executeAsync(streamGraph);
        final List<JobListener> jobListeners = getJobListeners();

        try 
            final JobExecutionResult jobExecutionResult = getJobExecutionResult(jobClient);
            jobListeners.forEach(
                    jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
            return jobExecutionResult;
         catch (Throwable t) 
            jobListeners.forEach(
                    jobListener ->
                            jobListener.onJobExecuted(
                                    null, ExceptionUtils.stripExecutionException(t)));
            ExceptionUtils.rethrowException(t);

            // never reached, only make javac happy
            return null;
        
    
org.apache.flink.client.program.StreamContextEnvironment#executeAsync


  @Override
    public JobClient executeAsync(StreamGraph streamGraph) throws Exception 
        validateAllowedExecution();
        //继续跟进这里
        final JobClient jobClient = super.executeAsync(streamGraph);

        if (!suppressSysout) 
            System.out.println("Job has been submitted with JobID " + jobClient.getJobID());
        

        return jobClient;
    
org.apache.flink.client.program.StreamPlanEnvironment#executeAsync

@Override
    public JobClient executeAsync(StreamGraph streamGraph) 
        //streamGraph 就是pipeline
        pipeline = streamGraph;

        // do not go on with anything now!
        throw new ProgramAbortException();
    

之前我还很奇怪,明明在部署阶段应该只是获取任务流图而已,可如果按正常流程走,不就把job起起来了吗? 可是时机不对吧,到现在我才发现:原来是在获取了pipeline之后,通过异常的方式提前终止了任务继续提交!! 现在回头看创建pipeline那块:

 try 
           //反射去调用用户的自定义jar包中,指定class的main方法 
            program.invokeInteractiveModeForExecution();
         catch (Throwable t) 
            if (benv.getPipeline() != null) 
                return benv.getPipeline();
            
            //问题: 这个pipeline是怎么拿到的? 为什么反射调用了user code jar,就可以得到了?请看下文!! 
            if (senv.getPipeline() != null) 
                return senv.getPipeline();
            

            if (t instanceof ProgramInvocationException) 
                throw t;
            

            throw generateException(
                    program, "The program caused an error: ", t, stdOutBuffer, stdErrBuffer);
         finally 
            benv.unsetAsContext();
            senv.unsetAsContext();
            if (suppressOutput) 
                System.setOut(originalOut);
                System.setErr(originalErr);
            
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        

        throw generateException(
                program,
                "The program plan could not be fetched - the program aborted pre-maturely.",
                null,
                stdOutBuffer,
                stdErrBuffer);

原来早已经埋下了伏笔!! 这里通过异常来完成了流程控制,原来如此!!!

以上是关于Flink源码剖析之JobGraph的生成的主要内容,如果未能解决你的问题,请参考以下文章

Flink从入门到放弃之源码解析系列-第2章 Flink执行计划生成

FLINK重点原理与机制:内存网络流控及反压机制剖析

Flink 核心组件 架构原理 多图剖析

Flink的Job启动Driver端(源码分析)

Flink的Job启动JobManager端(源码分析)

FlinkFlink源码分析——批处理模式JobGraph的创建