Flink1.15源码解析--任务提交流程----flink run

Posted 宝哥大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--任务提交流程----flink run相关的知识,希望对你有一定的参考价值。

文章目录

零、前言

任务提交方式:运行命令行flink脚本
使用flink脚本提交任务示例:

flink run ...

从 flink 脚本可以看到 org.apache.flink.client.cli.CliFrontend入口类

# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
exec "$JAVA_RUN" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "$log_setting[@]" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

主要功能是接收并解析命令行传入的命令,调用相应工具类执行命令
提供以下actions:

  • run:编译并运行程序
  • cancel:取消正在运行的程序(官方不推荐使用该方式)
  • stop:使用保存点停止正在运行的程序(仅用于流作业)
  • savepoint:触发正在运行的作业的保存点或处置现有的保存点
  • info:显示程序执行计划(JSON)
  • list:列出正在运行和计划的程序

一、CliFrontend

接下来我们看 mian()如何执行 run 流程

  • 1、获取flink的conf目录的路径
  • 2、根据conf路径,加载配置
  • 3、封装命令行接口:按顺序Generic、Yarn、Default
    /** Submits the job based on the arguments. */
    public static void main(final String[] args) 
    	// 打印基本的环境信息
        EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

        // 1. find the configuration directory
        final String configurationDirectory = getConfigurationDirectoryFromEnv();

        // 2. load the global configuration flink-conf.yaml
        final Configuration configuration =
                GlobalConfiguration.loadConfiguration(configurationDirectory);

        // 3. load the custom command lines 封装命令行接口:按顺序Generic、Yarn、Default
        final List<CustomCommandLine> customCommandLines =
                loadCustomCommandLines(configuration, configurationDirectory);

        int retCode = 31; 
        try 
        	// 4、 创建 CliFrontend  对象
            final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
			// 5、加载安全配置模块
            SecurityUtils.install(new SecurityConfiguration(cli.configuration));
            // 6.根据命令行参数进行Switch case 匹配,执行对应的action、回调,并返回状态码。这块是主要逻辑
            retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
         catch (Throwable t)  //....        
        finally 
            System.exit(retCode); // 7、获取执行返回状态码,关闭提交程序
        
    

详细流程分析如下

1.1、打印基本的环境信息

EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

logEnvironmentInfo 具体实现:

/**
	 * Logs information about the environment, like code revision, current user, Java version,
	 * and JVM parameters.
	 *
	 * @param log The logger to log the information to.
	 * @param componentName The component name to mention in the log.
	 * @param commandLineArgs The arguments accompanying the starting the component.
	 */
	public static void logEnvironmentInfo(Logger log, String componentName, String[] commandLineArgs) 
		if (log.isInfoEnabled()) 
			// 得到代码git的最终提交id和日期
	        RevisionInformation rev = getRevisionInformation();
	        // 代码版本
	        String version = getVersion();
	        // JVM版本,利用JavaSDK自带的ManagementFactory类来获取。
	        String jvmVersion = getJvmVersion();
	        // JVM的启动参数,也是通过JavaSDK自带的ManagementFactory类来获取。
	        String[] options = getJvmStartupOptionsArray();
	        // JAVA_Home目录
	        String javaHome = System.getenv("JAVA_HOME");
	        // JVM的最大堆内存大小,单位Mb。
	        long maxHeapMegabytes = getMaxJvmHeapMemory() >>> 20;
	
	        // 打印基本信息
			log.info("--------------------------------------------------------------------------------");
			log.info(" Starting " + componentName + " (Version: " + version + ", "
					+ "Rev:" + rev.commitId + ", " + "Date:" + rev.commitDate + ")");
			log.info(" OS current user: " + System.getProperty("user.name"));
			log.info(" Current Hadoop/Kerberos user: " + getHadoopUser());
			log.info(" JVM: " + jvmVersion);
			log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes");
			log.info(" JAVA_HOME: " + (javaHome == null ? "(not set)" : javaHome));
			// hadoop的版本信息
			String hadoopVersionString = getHadoopVersionString();
			if (hadoopVersionString != null) 
				log.info(" Hadoop version: " + hadoopVersionString);
			 else 
				log.info(" No Hadoop Dependency available");
			
			// 打印JVM运行 参数
			if (options.length == 0) 
				log.info(" JVM Options: (none)");
			
			else 
				log.info(" JVM Options:");
				for (String s: options) 
					log.info("    " + s);
				
			
			// 任务程序启动参数
			if (commandLineArgs == null || commandLineArgs.length == 0) 
				log.info(" Program Arguments: (none)");
			
			else 
				log.info(" Program Arguments:");
				for (String s: commandLineArgs) 
					log.info("    " + s);
				
			

			log.info(" Classpath: " + System.getProperty("java.class.path"));

			log.info("--------------------------------------------------------------------------------");
		
	

1.2、获取 flink 配置文件目录

        // 1. find the configuration directory
        final String configurationDirectory = getConfigurationDirectoryFromEnv();

根据环境变量 FLINK_CONF_DIR 获取 flink 配置文件目录

1.3、加载 flink 配置文件解析成 Configuration 对象

调用 GlobalConfiguration 的 loadConfiguration 方法加载 flink 配置文件 flink-conf.yaml 中的配置,解析后转成 Configuration 对象

        // 2. load the global configuration flink-conf.yaml
        final Configuration configuration =
                GlobalConfiguration.loadConfiguration(configurationDirectory);

loadConfiguration 具体实现:

  /**
   * Loads the configuration files from the specified directory. If the dynamic properties
   * configuration is not null, then it is added to the loaded configuration.
   *
   * @param configDir directory to load the configuration from
   * @param dynamicProperties configuration file containing the dynamic properties. Null if none.
   * @return The configuration loaded from the given configuration directory
   */
  public static Configuration loadConfiguration(
      final String configDir, @Nullable final Configuration dynamicProperties) 

    if (configDir == null) 
      throw new IllegalArgumentException(
          "Given configuration directory is null, cannot load configuration");
    

    final File confDirFile = new File(configDir);
    if (!(confDirFile.exists())) 
      throw new IllegalConfigurationException(
          "The given configuration directory name '"
              + configDir
              + "' ("
              + confDirFile.getAbsolutePath()
              + ") does not describe an existing directory.");
    
    /** 1.判断配置目录是否为空,不为空获取配置文件,就是flink的配置文件flink-conf.yaml */
    // get Flink yaml configuration file
    final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);

    if (!yamlConfigFile.exists()) 
      throw new IllegalConfigurationException(
          "The Flink config file '"
              + yamlConfigFile
              + "' ("
              + confDirFile.getAbsolutePath()
              + ") does not exist.");
    
    /** 2.【核心逻辑】获取到文件文件后,调用loadYAMLResource方法,去解析yaml配置文件,并返回HashMap键值对形式的Configuration */
    Configuration configuration = loadYAMLResource(yamlConfigFile);

    if (dynamicProperties != null) 
      configuration.addAll(dynamicProperties);
    

    return enrichWithEnvironmentVariables(configuration);
  

1.4、加载自定义命令行(CustomCommandLine)

调用loadCustomCommandLines方法,加载 自定义命令行(CustomCommandLine)

       // 3. load the custom command lines
        final List<CustomCommandLine> customCommandLines =
                loadCustomCommandLines(configuration, configurationDirectory);

调用 loadCustomCommandLines 方法,加载自定义命令行

  • 1、创建一个 GenericCLI
    1. 通过反射 添加yarn模式命令行
    • 添加异常时, 添加 FallbackYarnSessionCli
  • 3、添加 DefaultCLI
    public static List<CustomCommandLine> loadCustomCommandLines(
            Configuration configuration, String configurationDirectory) 
        List<CustomCommandLine> customCommandLines = new ArrayList<>();
        // 1、创建一个 GenericCLI
        customCommandLines.add(new GenericCLI(configuration, configurationDirectory));

        //	Command line interface of the YARN session, with a special initialization here
        //	to prefix all options with y/yarn.
        //  2. YARN会话的命令行接口,所有选项参数都是以y/yarn前缀。
        final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
        try 
            // 3. 添加yarn模式命令行
            customCommandLines.add(
                    loadCustomCommandLine(
                            flinkYarnSessionCLI,
                            configuration,
                            configurationDirectory,
                            "y",
                            "yarn"));
         catch (NoClassDefFoundError | Exception e) 
            final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
            try 
                LOG.info("Loading FallbackYarnSessionCli");
                // 4、出现异常时,添加 FallbackYarnSessionCli
                customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));
             catch (Exception exception) 
                LOG.warn("Could not load CLI class .", flinkYarnSessionCLI, e);
            
        

        //	Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get
        // the active CustomCommandLine in order and DefaultCLI isActive always return true.
        // 5、添加 DefaultCLI
        customCommandLines.add(new DefaultCLI());

        return customCommandLines;
    

类图关系如下:

后面章节讲解获取活跃状态的命令行客户端,就是下面封装的GenericCLI、FlinkYarnSessionCli、DefaultCLI 三个客户端。按顺序判断那个是活跃,谁活跃就使用谁,然后跳出判断,返回结果。下面介绍其判断逻辑。

  • GenericCLI:存在execution.target、-e 、–executor、-t、–target这几个配置或参数,且值不为null,则使用GenericCLI。
  • FlinkYarnSessionCli:-m --jobmanager的值等于yarn-cluster 或 参数中传入的yarn applicationId值存在 或 execution.target的值为yarn-session或yarn-pre-job
  • DefaultCLI:默认返回true,standalone模式使用

1.4.1、创建一个 GenericCLI

        // 1、创建一个 GenericCLI
        customCommandLines.add(new GenericCLI(configuration, configurationDirectory));

1.4.2、 通过反射 添加yarn模式命令行


通过反射构建 yarn 命令行

/**
  * 通过反射构建命令行
  * @param className 加载的类名全程.
  * @param params 构建参数
  */
private static CustomCommandLine<?> loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException 

    // 1. 加载classpath里相关的类,这个加载的类实现了CustomCommandLine接口
    Class<? extends CustomCommandLine> customCliClass =
        Class.forName(className).asSubclass(CustomCommandLine.class);

    // 2. 从参数里构建出参数的Class类型
    Class<?>[] types = new Class<?>[params.length];
    for (int i = 0; i < params.length; i++) 
        Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
        types[i] = params[i].getClass();
    
    // 3. 生成构造器org.apache.flink.yarn.cli$FlinkYarnSessionCli
    Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);

    // 4. 构造器实例化。调用org.apache.flink.yarn.cli$FlinkYarnSessionCli的构造方法,进行实例化。
    return constructor.newInstance(params);


	/**
	 * 初始化一个FlinkYarnSessionCli
	 * @param configuration  全局的配置
	 * @param configurationDirectory  全局的配置文件目录
	 * @param shortPrefix   命令行参数的缩写前缀
	 * @param longPrefix    命令行参数的展开前缀
	 * @param acceptInteractiveInput 是否接受交互型输入
	 * @throws FlinkException
	 */
    public FlinkYarnSessionCli(
            Configuration configuration,
            ClusterClientServiceLoader clusterClientServiceLoader,
            String configurationDirectory,
            String shortPrefix,
            String longPrefix,
            boolean acceptInteractiveInput)
            throws FlinkException 
        // 1. 初始化参数
        super(configuration, shortPrefix, longPrefix);
        this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
        this.configurationDirectory = checkNotNull(configurationDirectory);
        this.acceptInteractiveInput = acceptInteractiveInput;

        // Create the command line options
		// 2. 创建命令行选项
        query =
                new Option(
                        shortPrefix + "q",
                        longPrefix + "query",
                        false,
                        "Display available YARN resources (memory, cores)");
        queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
        shipPath =
                new Option(
                        shortPrefix + "t",
                        longPrefix + "ship",
                        true,
                        "Ship files in the specified directory (t for transfer)");
        flinkJar =
                new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
        jmMemory =
                new Option(
                        shortPrefix + "jm",
                        longPrefix + "jobManagerMemory",
                        true,
                        "Memory for JobManager Container with optional unit (default: MB)");
        tmMemory =
                new Option(
                        shortPrefix + "tm",
                        longPrefix + "taskManagerMemory",
                        true,
                        "Memory per TaskManager Container with optional unit (default: MB)");
        slots =
                new Option(
                        shortPrefix + "s",
                        longPrefix + "slots",
                        true,
                        "Number of slots per TaskManager");
        dynamicproperties =
                Option.builder(shortPrefix + "D")
                        .argName("property=value")
                        .numberOfArgs(2)
                        .valueSeparator()
                        .desc("use value for given property")
                        .build();
        name =
                new Option(
                        shortPrefix + "nm",
                        longPrefix + "name",
                        true,
                        "Set a custom name for the application on YARN");
        applicationType =
                new Option(
                        shortPrefix + "at",
                        longPrefix + "applicationType",
                        true,
                        "Set a custom application type for the application on YARN");
        zookeeperNamespace =
                new Option(
                        shortPrefix + "z",
                        longPrefix + "zookeeperNamespace",
                        true,
                        "Namespace to create the Zookeeper sub-paths for high availability mode");
        nodeLabel =
                new Option(
                        shortPrefix + "nl",
                        longPrefix + "nodeLabel",
                        true,
                        "Specify YARN node label for the YARN application");
        help =
                new Option(
                        shortPrefix + "h",
                        longPrefix + "help",
                        false,
                        "Help for the Yarn session CLI.");

        allOptions = new Options();
        allOptions.addOption(flinkJar);
        allOptions.addOption(jmMemory);
        allOptions.addOption(tmMemory);
        allOptions.addOption(queue);
        allOptions.addOption(query);
        allOptions.addOption(shipPath);
        allOptions.addOption(slots);
        allOptions.addOption(dynamicproperties);
        allOptions.addOption(DETACHED_OPTION);
        allOptions.addOption以上是关于Flink1.15源码解析--任务提交流程----flink run的主要内容,如果未能解决你的问题,请参考以下文章

Flink1.15源码解析--启动JobManager

Flink1.15源码解析--启动JobManager

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----ResourceManager启动