flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 从脚本到主类
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 从脚本到主类相关的知识,希望对你有一定的参考价值。
1.概述
转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [一] 可以去看原文。这里是补充专栏。请看原文
2. 前言
主要针对yarn-per-job模式进行代码分析. 看看都干了啥…
二 .启动程序入口
2.1. 启动数据服务端
先打开数据的服务端
nc -lk 9999
2.2. 提交Flink 任务.
打开flink消费服务端9999里面的数据.
执行命令 :
cd ${FLINK_HOME}
flink run -t yarn-per-job -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount examples/streaming/SocketWindowWordCount.jar --port 9999
BoYi-Pro:flink-1.12.2 sysadmin$ flink run -t yarn-per-job -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount examples/streaming/SocketWindowWordCount.jar --port 9999
2021-03-11 18:20:48,389 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/opt/tools/flink-1.12.2/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-03-11 18:20:48,450 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at /0.0.0.0:8032
2021-03-11 18:20:48,634 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-03-11 18:20:48,818 INFO org.apache.hadoop.conf.Configuration [] - resource-types.xml not found
2021-03-11 18:20:48,820 INFO org.apache.hadoop.yarn.util.resource.ResourceUtils [] - Unable to find 'resource-types.xml'.
2021-03-11 18:20:48,905 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2021-03-11 18:20:48,905 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 320 MB may not be used by Flink.
2021-03-11 18:20:48,906 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=1}
2021-03-11 18:20:53,188 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Submitting application master application_1615446205104_0004
2021-03-11 18:20:53,284 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted application application_1615446205104_0004
2021-03-11 18:20:53,288 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Waiting for the cluster to be allocated
2021-03-11 18:20:53,297 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED
2021-03-11 18:21:08,269 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully.
2021-03-11 18:21:08,272 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface 192.168.8.188:62520 of application 'application_1615446205104_0004'.
Job has been submitted with JobID fed09b80388db7019598ef707fafe017
显示页面如下:
三 .flink命令入口
3.1. 提交命令
我们先看一下执行任务的命令:
flink run -t yarn-per-job -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount examples/streaming/SocketWindowWordCount.jar --port 9999
发现是通过flink run 指令执行的, 最终的输出 :
${JAVA_HOME}/bin/java
-Dlog.file=${FLINK_HOME}/log/flink-sysadmin-client-BoYi-Pro.local.log
-Dlog4j.configuration=file:${FLINK_HOME}/conf/log4j-cli.properties
-Dlog4j.configurationFile=file:${FLINK_HOME}/conf/log4j-cli.properties
-Dlogback.configurationFile=file:${FLINK_HOME}/conf/logback.xml
-classpath ${FLINK_HOME}/*.jar::/opt/tools/hadoop-3.2.1/etc/hadoop::/opt/tools/hbase-2.0.2/conf
org.apache.flink.client.cli.CliFrontend
run
-t yarn-per-job
-c org.apache.flink.streaming.examples.socket.SocketWindowWordCount
examples/streaming/SocketWindowWordCount.jar --port 9999
看到这里, 我们 程序提交的入口就是 :
org.apache.flink.client.cli.CliFrontend
后续会存在三个进程 ;
CliFrontend
参数解析
封装CommandLine:三个,依次添加
配置的封装
执行用户代码: execute()
生成StreamGraph
Executor:生成JobGraph
集群描述器:上传jar包、配置, 封装提交给yarn的命令
yarnclient提交应用
YarnJobClusterEntryPoint :AM执行的入口类
1、Dispatcher的创建和启动
2、ResourceManager的创建、启动:里面有一个 slotmanager(真正管理资源的、向yarn申请资源)
3、Dispatcher启动JobMaster:生成ExecutionGraph(里面有一个slotpool,真正去发送请求的)
4、slotpool向slotmanager申请资源, slotmanager向yarn申请资源(启动新节点)
YarnTaskExecutorRunner:Yarn模式下的TaskManager的入口类
1、启动 TaskExecutor
2、向ResourceManager注册slot
3、ResourceManager分配slot
4、TaskExecutor接收到分配的指令,提供offset给JobMaster(slotpool)
5、JobMaster提交任务给TaskExecutor去执行
3.2. DEBUG环境准备
新增环境变量, 否则会报找不到 FLINK_CONF_DIR 错误.
export FLINK_CONF_DIR=/opt/tools/flink-1.12.2/conf
export PATH=$PATH:$FLINK_CONF_DIR
为了调试, 需要将hadoop的依赖包加进去…
否则会报找不到HADOOP_CLASSPATH异常…
如果报 找不到FlinkYarnSessionCli的类 就把flink-dist_2.11-1.12.2.jar 也加进去…
/opt/tools/flink-1.12.2/lib/flink-dist_2.11-1.12.2.jar
3.3. 构建测试入口类: CliFrontendTest
在test代码块新建类CliFrontendTest ,用于测试.
package boyi;
import org.apache.flink.client.cli.CliFrontend;
public class CliFrontendTest {
public static void main(String[] args) {
String [] param = new String[8] ;
param[0] = "run" ;
param[1] = "-t" ;
param[2] = "yarn-per-job" ;
param[3] = "-c" ;
param[4] = "org.apache.flink.streaming.examples.socket.SocketWindowWordCount" ;
param[5] = "/opt/tools/flink-1.12.2/examples/streaming/SocketWindowWordCount.jar" ;
param[6] = "--port" ;
param[7] = "9999" ;
CliFrontend.main(param);
}
}
四 . CliFrontend
4.1. CliFrontend#main
先从程序的入口开始看起…
/**
* env :
*
* FLINK_CONF_DIR=/opt/tools/flink-1.12.2/conf;log.file=/opt/tools/flink-1.12.2/log/flink-sysadmin-client-BoYi-Pro.local.log ;log4j.configuration=file:/opt/tools/flink-1.12.2/conf/log4j-cli.properties ;log4j.configurationFile=file:/opt/tools/flink-1.12.2/conf/log4j-cli.properties ;logback.configurationFile=file:/opt/tools/flink-1.12.2/conf/logback.xml
*
*
* org.apache.flink.client.cli.CliFrontend
* run -t yarn-per-job -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount /opt/tools/flink-1.12.2/examples/streaming/SocketWindowWordCount.jar --port 9999
*
* 提交函数
* Submits the job based on the arguments.
*
*
*
*
* */
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
// 1. 获取配置conf目录: /opt/tools/flink-1.12.2/conf
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
// 2. 加载全局conf配置:
// "taskmanager.memory.process.size" -> "1728m"
// "parallelism.default" -> "1"
// "jobmanager.execution.failover-strategy" -> "region"
// "jobmanager.rpc.address" -> "localhost"
// "taskmanager.numberOfTaskSlots" -> "1"
// "jobmanager.memory.process.size" -> "1600m"
// "jobmanager.rpc.port" -> "6123"
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines
// 3. 加载自定义参数
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
try {
// 构建CliFrontend : GenericCLI > flinkYarnSessionCLI > DefaultCLI
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
// 使用parseAndRun 提交指令
int retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
System.exit(retCode);
} catch (Throwable t) {
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
System.exit(31);
}
}
4.2. CliFrontend#parseAndRun(args)
parseAndRun 方法会根据请求的命令的不同调用不同的方法.
支持的命令
// actions
private static final String ACTION_RUN = "run";
private static final String ACTION_RUN_APPLICATION = "run-application";
private static final String ACTION_INFO = "info";
private static final String ACTION_LIST = "list";
private static final String ACTION_CANCEL = "cancel";
private static final String ACTION_STOP = "stop";
private static final String ACTION_SAVEPOINT = "savepoint";
代码如下:
/**
* Parses the command line arguments and starts the requested action.
*
* @param args command line arguments of the client.
* @return The return code of the program
*/
public int parseAndRun(String[] args) {
// 检测参数
// check for action
if (args.length < 1) {
CliFrontendParser.printHelp(customCommandLines);
System.out.println("Please specify an action.");
return 1;
}
// get action
// 获取动作
String action = args[0];
// remove action from parameters
final String[] params = Arrays.copyOfRange(args, 1, args.length);
try {
// do action
switch (action) {
case ACTION_RUN:
// 执行 run 操作...
run(params);
return 0;
case ACTION_RUN_APPLICATION:
// 执行run-application 操作
runApplication(params);
return 0;
case ACTION_LIST:
// 执行 list 操作
list(params);
return 0;
case ACTION_INFO:
// 执行 info 操作
info(params);
return 0;
case ACTION_CANCEL:
// 执行 cancel 操作
cancel(params);
return 0;
case ACTION_STOP:
// 执行 stop 操作
stop(params);
return 0;
case ACTION_SAVEPOINT:
// 执行 savepoint 操作
savepoint(params);
return 0;
case "-h":
case "--help":
CliFrontendParser.printHelp(customCommandLines);
return 0;
case "-v":
case "--version":
String version = EnvironmentInformation.getVersion();
String commitID = EnvironmentInformation.getRevisionInformation().commitId;
System.out.print("Version: " + version);
System.out.println(
commitID.equals(EnvironmentInformation.UNKNOWN)
? ""
: ", Commit ID: " + commitID);
return 0;
default:
System.out.printf("\\"%s\\" is not a valid action.\\n", action);
System.out.println();
System.out.println(
"Valid actions are \\"run\\", \\"run-application\\", \\"list\\", \\"info\\", \\"savepoint\\", \\"stop\\", or \\"cancel\\".");
System.out.println();
System.out.println(
"Specify the version option (-v or --version) to print Flink version.");
System.out.println();
System.out.println(
"Specify the help option (-h or --help) to get help on the command.");
return 1;
}
} catch (CliArgsException ce) {
return handleArgException(ce);
} catch (ProgramParametrizationException ppe) {
return handleParametrizationException(ppe);
} catch (ProgramMissingJobException pmje) {
return handleMissingJobException();
} catch (Exception e) {
return handleError(e);
}
}
4.3. CliFrontend#run
在这里其实就是确定执行flink的方式/环境/程序等等信息. 最后通过executeProgram(effectiveConfiguration, program); 提交.
/**
* Executions the run action.
*
* @param args Command line arguments for the run action.
*/
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
// 获取提交方式的client : GenericCLI@2282
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
// jobJars : 0 = file:/opt/tools/flink-1.12.2/examples/streaming/SocketWindowWordCount.jar
final List<URL> jobJars = getJobJarAndDependencies(programOptions);
// effectiveConfiguration = {Configuration@3429} "{taskmanager.memory.process.size=1728m, jobmanager.execution.failover-strategy=region, jobmanager.rpc.address=localhost, execution.target=yarn-per-job, jobmanager.memory.process.size=1600m, jobmanager.rpc.port=6123, execution.savepoint.ignore-unclaimed-state=false, execution.attached=true, execution.shutdown-on-attached-exit=false, pipeline.jars=[file:/opt/tools/flink-1.12.2/examples/streaming/SocketWindowWordCount.jar], parallelism.default=1, task以上是关于flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 从脚本到主类的主要内容,如果未能解决你的问题,请参考以下文章
flinkFlink 1.12.2 源码浅析 : Task 浅析
flinkFlink 1.12.2 源码浅析 : Task数据输入
flinkFlink 1.12.2 源码浅析 :Task数据输出
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint