flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 从脚本到主类

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 从脚本到主类相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [一] 可以去看原文。这里是补充专栏。请看原文

2. 前言

主要针对yarn-per-job模式进行代码分析. 看看都干了啥…

二 .启动程序入口

2.1. 启动数据服务端

先打开数据的服务端

nc -lk 9999

2.2. 提交Flink 任务.

打开flink消费服务端9999里面的数据.
执行命令 :

cd ${FLINK_HOME}
flink run -t yarn-per-job -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount examples/streaming/SocketWindowWordCount.jar --port 9999

BoYi-Pro:flink-1.12.2 sysadmin$ flink run -t yarn-per-job  -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount  examples/streaming/SocketWindowWordCount.jar --port 9999

2021-03-11 18:20:48,389 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/tools/flink-1.12.2/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-03-11 18:20:48,450 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at /0.0.0.0:8032
2021-03-11 18:20:48,634 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-03-11 18:20:48,818 INFO  org.apache.hadoop.conf.Configuration                         [] - resource-types.xml not found
2021-03-11 18:20:48,820 INFO  org.apache.hadoop.yarn.util.resource.ResourceUtils           [] - Unable to find 'resource-types.xml'.
2021-03-11 18:20:48,905 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2021-03-11 18:20:48,905 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 320 MB may not be used by Flink.
2021-03-11 18:20:48,906 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=1}
2021-03-11 18:20:53,188 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1615446205104_0004
2021-03-11 18:20:53,284 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1615446205104_0004
2021-03-11 18:20:53,288 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2021-03-11 18:20:53,297 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
2021-03-11 18:21:08,269 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2021-03-11 18:21:08,272 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface 192.168.8.188:62520 of application 'application_1615446205104_0004'.
Job has been submitted with JobID fed09b80388db7019598ef707fafe017



显示页面如下:

三 .flink命令入口

3.1. 提交命令

我们先看一下执行任务的命令:

flink run -t yarn-per-job -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount examples/streaming/SocketWindowWordCount.jar --port 9999

发现是通过flink run 指令执行的, 最终的输出 :


${JAVA_HOME}/bin/java    
-Dlog.file=${FLINK_HOME}/log/flink-sysadmin-client-BoYi-Pro.local.log 
-Dlog4j.configuration=file:${FLINK_HOME}/conf/log4j-cli.properties 
-Dlog4j.configurationFile=file:${FLINK_HOME}/conf/log4j-cli.properties 
-Dlogback.configurationFile=file:${FLINK_HOME}/conf/logback.xml 
-classpath ${FLINK_HOME}/*.jar::/opt/tools/hadoop-3.2.1/etc/hadoop::/opt/tools/hbase-2.0.2/conf 

org.apache.flink.client.cli.CliFrontend 
run 
-t yarn-per-job 
-c org.apache.flink.streaming.examples.socket.SocketWindowWordCount 
examples/streaming/SocketWindowWordCount.jar --port 9999


看到这里, 我们 程序提交的入口就是 :

org.apache.flink.client.cli.CliFrontend

后续会存在三个进程 ;


CliFrontend
	参数解析
	封装CommandLine:三个,依次添加
	配置的封装
	执行用户代码: execute()
	生成StreamGraph
	Executor:生成JobGraph
	集群描述器:上传jar包、配置, 封装提交给yarn的命令
	yarnclient提交应用
	
YarnJobClusterEntryPoint :AM执行的入口类
	1Dispatcher的创建和启动
	2ResourceManager的创建、启动:里面有一个 slotmanager(真正管理资源的、向yarn申请资源)
	3Dispatcher启动JobMaster:生成ExecutionGraph(里面有一个slotpool,真正去发送请求的)
	4、slotpool向slotmanager申请资源, slotmanager向yarn申请资源(启动新节点)
	
YarnTaskExecutorRunnerYarn模式下的TaskManager的入口类
	1、启动 TaskExecutor
	2、向ResourceManager注册slot
	3ResourceManager分配slot
	4TaskExecutor接收到分配的指令,提供offset给JobMaster(slotpool)
	5JobMaster提交任务给TaskExecutor去执行
	
		

3.2. DEBUG环境准备

新增环境变量, 否则会报找不到 FLINK_CONF_DIR 错误.

export FLINK_CONF_DIR=/opt/tools/flink-1.12.2/conf
export PATH=$PATH:$FLINK_CONF_DIR

为了调试, 需要将hadoop的依赖包加进去…
否则会报找不到HADOOP_CLASSPATH异常…


如果报 找不到FlinkYarnSessionCli的类 就把flink-dist_2.11-1.12.2.jar 也加进去…

/opt/tools/flink-1.12.2/lib/flink-dist_2.11-1.12.2.jar

3.3. 构建测试入口类: CliFrontendTest
在test代码块新建类CliFrontendTest ,用于测试.

   package boyi;

import org.apache.flink.client.cli.CliFrontend;

public class CliFrontendTest {

    public static void main(String[] args) {

        String [] param = new String[8] ;
        param[0] = "run" ;
        param[1] = "-t" ;
        param[2] = "yarn-per-job" ;
        param[3] = "-c" ;
        param[4] = "org.apache.flink.streaming.examples.socket.SocketWindowWordCount" ;
        param[5] = "/opt/tools/flink-1.12.2/examples/streaming/SocketWindowWordCount.jar" ;
        param[6] = "--port" ;
        param[7] = "9999" ;

        CliFrontend.main(param);

    }
}


四 . CliFrontend


4.1. CliFrontend#main
先从程序的入口开始看起…


    /**
     * env :
     *
     * FLINK_CONF_DIR=/opt/tools/flink-1.12.2/conf;log.file=/opt/tools/flink-1.12.2/log/flink-sysadmin-client-BoYi-Pro.local.log ;log4j.configuration=file:/opt/tools/flink-1.12.2/conf/log4j-cli.properties ;log4j.configurationFile=file:/opt/tools/flink-1.12.2/conf/log4j-cli.properties ;logback.configurationFile=file:/opt/tools/flink-1.12.2/conf/logback.xml
     *
     *
     * org.apache.flink.client.cli.CliFrontend
     * run -t yarn-per-job -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount /opt/tools/flink-1.12.2/examples/streaming/SocketWindowWordCount.jar --port 9999
     *
     * 提交函数
     * Submits the job based on the arguments.
     *
     *
     *
     *
     * */
    public static void main(final String[] args) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

        // 1. find the configuration directory
        // 1. 获取配置conf目录: /opt/tools/flink-1.12.2/conf
        final String configurationDirectory = getConfigurationDirectoryFromEnv();

        // 2. load the global configuration
        // 2. 加载全局conf配置:
        //    "taskmanager.memory.process.size" -> "1728m"
        //    "parallelism.default" -> "1"
        //    "jobmanager.execution.failover-strategy" -> "region"
        //    "jobmanager.rpc.address" -> "localhost"
        //    "taskmanager.numberOfTaskSlots" -> "1"
        //    "jobmanager.memory.process.size" -> "1600m"
        //    "jobmanager.rpc.port" -> "6123"
        final Configuration configuration =
                GlobalConfiguration.loadConfiguration(configurationDirectory);

        // 3. load the custom command lines
        // 3. 加载自定义参数
        final List<CustomCommandLine> customCommandLines =
                loadCustomCommandLines(configuration, configurationDirectory);

        try {
            // 构建CliFrontend   :    GenericCLI > flinkYarnSessionCLI > DefaultCLI
            final CliFrontend cli = new CliFrontend(configuration, customCommandLines);

            SecurityUtils.install(new SecurityConfiguration(cli.configuration));

            // 使用parseAndRun 提交指令
            int retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
            System.exit(retCode);


        } catch (Throwable t) {
            final Throwable strippedThrowable =
                    ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            LOG.error("Fatal error while running command line interface.", strippedThrowable);
            strippedThrowable.printStackTrace();
            System.exit(31);
        }
    }


4.2. CliFrontend#parseAndRun(args)
parseAndRun 方法会根据请求的命令的不同调用不同的方法.

支持的命令


    // actions
    private static final String ACTION_RUN = "run";
    private static final String ACTION_RUN_APPLICATION = "run-application";
    private static final String ACTION_INFO = "info";
    private static final String ACTION_LIST = "list";
    private static final String ACTION_CANCEL = "cancel";
    private static final String ACTION_STOP = "stop";
    private static final String ACTION_SAVEPOINT = "savepoint";

代码如下:


    /**
     * Parses the command line arguments and starts the requested action.
     *
     * @param args command line arguments of the client.
     * @return The return code of the program
     */
    public int parseAndRun(String[] args) {

        // 检测参数
        // check for action
        if (args.length < 1) {
            CliFrontendParser.printHelp(customCommandLines);
            System.out.println("Please specify an action.");
            return 1;
        }

        // get action
        // 获取动作
        String action = args[0];

        // remove action from parameters
        final String[] params = Arrays.copyOfRange(args, 1, args.length);

        try {
            // do action
            switch (action) {
                case ACTION_RUN:
                    // 执行 run 操作...
                    run(params);
                    return 0;
                case ACTION_RUN_APPLICATION:
                    // 执行run-application 操作
                    runApplication(params);
                    return 0;
                case ACTION_LIST:
                    // 执行 list 操作
                    list(params);
                    return 0;
                case ACTION_INFO:
                    // 执行 info 操作
                    info(params);
                    return 0;
                case ACTION_CANCEL:
                    // 执行 cancel 操作
                    cancel(params);
                    return 0;
                case ACTION_STOP:
                    // 执行 stop 操作
                    stop(params);
                    return 0;
                case ACTION_SAVEPOINT:
                    // 执行 savepoint 操作
                    savepoint(params);
                    return 0;
                case "-h":
                case "--help":
                    CliFrontendParser.printHelp(customCommandLines);
                    return 0;
                case "-v":
                case "--version":
                    String version = EnvironmentInformation.getVersion();
                    String commitID = EnvironmentInformation.getRevisionInformation().commitId;
                    System.out.print("Version: " + version);
                    System.out.println(
                            commitID.equals(EnvironmentInformation.UNKNOWN)
                                    ? ""
                                    : ", Commit ID: " + commitID);
                    return 0;
                default:
                    System.out.printf("\\"%s\\" is not a valid action.\\n", action);
                    System.out.println();
                    System.out.println(
                            "Valid actions are \\"run\\", \\"run-application\\", \\"list\\", \\"info\\", \\"savepoint\\", \\"stop\\", or \\"cancel\\".");
                    System.out.println();
                    System.out.println(
                            "Specify the version option (-v or --version) to print Flink version.");
                    System.out.println();
                    System.out.println(
                            "Specify the help option (-h or --help) to get help on the command.");
                    return 1;
            }
        } catch (CliArgsException ce) {
            return handleArgException(ce);
        } catch (ProgramParametrizationException ppe) {
            return handleParametrizationException(ppe);
        } catch (ProgramMissingJobException pmje) {
            return handleMissingJobException();
        } catch (Exception e) {
            return handleError(e);
        }
    }

4.3. CliFrontend#run
在这里其实就是确定执行flink的方式/环境/程序等等信息. 最后通过executeProgram(effectiveConfiguration, program); 提交.


    /**
     * Executions the run action.
     *
     * @param args Command line arguments for the run action.
     */
    protected void run(String[] args) throws Exception {
        LOG.info("Running 'run' command.");

        final Options commandOptions = CliFrontendParser.getRunCommandOptions();
        final CommandLine commandLine = getCommandLine(commandOptions, args, true);

        // evaluate help flag
        if (commandLine.hasOption(HELP_OPTION.getOpt())) {
            CliFrontendParser.printHelpForRun(customCommandLines);
            return;
        }

        // 获取提交方式的client  :  GenericCLI@2282
        final CustomCommandLine activeCommandLine =
                validateAndGetActiveCommandLine(checkNotNull(commandLine));

        final ProgramOptions programOptions = ProgramOptions.create(commandLine);


        // jobJars : 0 = file:/opt/tools/flink-1.12.2/examples/streaming/SocketWindowWordCount.jar
        final List<URL> jobJars = getJobJarAndDependencies(programOptions);

        //    effectiveConfiguration = {Configuration@3429} "{taskmanager.memory.process.size=1728m, jobmanager.execution.failover-strategy=region, jobmanager.rpc.address=localhost, execution.target=yarn-per-job, jobmanager.memory.process.size=1600m, jobmanager.rpc.port=6123, execution.savepoint.ignore-unclaimed-state=false, execution.attached=true, execution.shutdown-on-attached-exit=false, pipeline.jars=[file:/opt/tools/flink-1.12.2/examples/streaming/SocketWindowWordCount.jar], parallelism.default=1, task

以上是关于flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 从脚本到主类的主要内容,如果未能解决你的问题,请参考以下文章

flinkFlink 1.12.2 源码浅析 : Task 浅析

flinkFlink 1.12.2 源码浅析 : Task数据输入

flinkFlink 1.12.2 源码浅析 :Task数据输出

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint