flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析相关的知识,希望对你有一定的参考价值。
1.概述
转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [二] 请大家看原文去。
接上文Flink 1.12.2 源码分析 : yarn-per-job模式浅析 [一] .
CliFrontend类最终会调用我们自己写的代码,入口类是main方法.
整体流程图
细节图
整体代码如下:
package org.apache.flink.streaming.examples.socket;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Implements a streaming windowed version of the "WordCount" program.
*
* <p>This program connects to a server socket and reads strings from the socket. The easiest way to
* try this out is to open a text server (at port 12345) using the <i>netcat</i> tool via
*
* <pre>
* nc -l 12345 on Linux or nc -l -p 12345 on Windows
* </pre>
*
* <p>and run this example with the hostname and the port as arguments.
*/
@SuppressWarnings("serial")
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// the host and the port to connect to
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
} catch (Exception e) {
System.err.println(
"No port specified. Please run 'SocketWindowWordCount "
+ "--hostname <hostname> --port <port>', where hostname (localhost by default) "
+ "and port is the address of the text server");
System.err.println(
"To start a simple text server, run 'netcat -l <port>' and "
+ "type the input text into the command line");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts =
text.flatMap(
new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(
String value, Collector<WordWithCount> out) {
for (String word : value.split("\\\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy(value -> value.word)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(
new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// ------------------------------------------------------------------------
/** Data type for words with count. */
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
二 .启动解析
2.1. StreamExecutionEnvironment#execute
在查看SocketWindowWordCount类的时候, 可以知道,这就是一个每5秒统计一次单词数量的类.
我们都知道, Flink代码最终执行的入口是env.execute, 所以我们从这里开始看.
2.1.1 StreamExecutionEnvironment#execute
在这里我们看到首先通过getStreamGraph(jobName) 生成StreamGraph , 然后交由execute(StreamGraph streamGraph) 开始执行
/**
* Triggers the program execution. The environment will execute all parts of the program that
* have resulted in a "sink" operation. Sink operations are for example printing results or
* forwarding them to a message queue.
*
* <p>The program execution will be logged and displayed with the provided name
*
* @param jobName Desired name of the job
* @return The result of the job execution, containing elapsed time and accumulators.
* @throws Exception which occurs during job execution.
*/
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
// 获取 getStreamGraph 继续执行...
return execute(getStreamGraph(jobName));
}
/**
* Triggers the program execution. The environment will execute all parts of the program that
* have resulted in a "sink" operation. Sink operations are for example printing results or
* forwarding them to a message queue.
*
* @param streamGraph the stream graph representing the transformations
* @return The result of the job execution, containing elapsed time and accumulators.
* @throws Exception which occurs during job execution.
*/
@Internal
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
// ----------------------
// 提交代码的streamGraph
// ----------------------
final JobClient jobClient = executeAsync(streamGraph);
try {
final JobExecutionResult jobExecutionResult;
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
jobExecutionResult = jobClient.getJobExecutionResult().get();
} else {
jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
}
jobListeners.forEach(
jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
return jobExecutionResult;
} catch (Throwable t) {
// get() on the JobExecutionResult Future will throw an ExecutionException. This
// behaviour was largely not there in Flink versions before the PipelineExecutor
// refactoring so we should strip that exception.
Throwable strippedException = ExceptionUtils.stripExecutionException(t);
jobListeners.forEach(
jobListener -> {
jobListener.onJobExecuted(null, strippedException);
});
ExceptionUtils.rethrowException(strippedException);
// never reached, only make javac happy
return null;
}
}
在接下来我们就看到 executeAsync(streamGraph);
2.2. StreamExecutionEnvironment#executeAsync
这个方法是通过PipelineExecutorFactory获取对应的YarnJobClusterExecutorFactory .
然后再由YarnJobClusterExecutorFactory获取执行的容器 YarnJobClusterExecutor
@Override
public CompletableFuture<JobClient> execute(
@Nonnull final Pipeline pipeline,
@Nonnull final Configuration configuration,
@Nonnull final ClassLoader userCodeClassloader)
throws Exception {
// 流图 抓换为 作业图
// JobGraph(jobId: 536af83b56ddfc2ef4ffda8b43a21e15)
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
// 获取yarn集群的描述符
try (final ClusterDescriptor<ClusterID> clusterDescriptor =
clusterClientFactory.createClusterDescriptor(configuration)) {
// {
// taskmanager.memory.process.size=1728m,
// jobmanager.execution.failover-strategy=region,
// jobmanager.rpc.address=localhost,
// execution.target=yarn-per-job,
// jobmanager.memory.process.size=1600m,
// jobmanager.rpc.port=6123,
// execution.savepoint.ignore-unclaimed-state=false,
// execution.attached=true,
// execution.shutdown-on-attached-exit=false,
// pipeline.jars=[file:/opt/tools/flink-1.12.2/examples/streaming/SocketWindowWordCount.jar],
// parallelism.default=1,
// taskmanager.numberOfTaskSlots=1,
// pipeline.classpaths=[],
// $internal.deployment.config-dir=/opt/tools/flink-1.12.2/conf,
// $internal.yarn.log-config-file=/opt/tools/flink-1.12.2/conf/log4j.properties
// }
final ExecutionConfigAccessor configAccessor =
ExecutionConfigAccessor.fromConfiguration(configuration);
// 获取指定资源的描述
// masterMemoryMB = 1600
// taskManagerMemoryMB = 1728
// slotsPerTaskManager = 1
final ClusterSpecification clusterSpecification =
clusterClientFactory.getClusterSpecification(configuration);
// 部署集群....
final ClusterClientProvider<ClusterID> clusterClientProvider =
clusterDescriptor.deployJobCluster(
clusterSpecification, jobGraph, configAccessor.getDetachedMode());
LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
return CompletableFuture.completedFuture(
new ClusterClientJobClientAdapter<>(
clusterClientProvider, jobGraph.getJobID(), userCodeClassloader));
}
}
2.3. YarnClusterDescriptor#deployJobCluster
直接调用deployInternal 方法开始 : 开始部署 yarn per-job cluster
@Override
public ClusterClientProvider<ApplicationId> deployJobCluster(
ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached)
throws ClusterDeploymentException {
try {
// 开始部署 yarn per-job cluster
return deployInternal(
clusterSpecification,
"Flink per-job cluster",
// 这个数yarn集群入口的名字 !!!!!!
getYarnJobClusterEntrypoint(),
jobGraph,
detached);
} catch (Exception e) {
throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
}
}
2.4. YarnClusterDescriptor#deployInternal [核心]
/**
* This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
*
* @param clusterSpecification Initial cluster specification for the Flink cluster to be
* deployed
* @param applicationName name of the Yarn application to start
* @param yarnClusterEntrypoint Class name of the Yarn cluster entry point.
* @param jobGraph A job graph which is deployed with the Flink cluster, {@code null} if none
* @param detached True if the cluster should be started in detached mode
*/
private ClusterClientProvider<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached)
throws Exception {
// 安心配置信息相关.....
// currentUser: sysadmin (auth:SIMPLE)
final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
boolean useTicketCache =
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
throw new RuntimeException(
"Hadoop security with Kerberos is enabled but the login user "
+ "does not have Kerberos credentials or delegation tokens!");
}
}
// 开启一系列检测, 是否有足够的资源启动集群 (jar , conf , yarn core)
isReadyForDeployment(clusterSpecification);
// ------------------ Check if the specified queue exists --------------------
// 检测队列是否存在...
checkYarnQueues(yarnClient);
// ------------------ Check if the YARN ClusterClient has the requested resources
// --------------
// 构建 application 的 客户端
// Create application via yarnClient
final YarnClientApplication yarnApplication = yarnClient.createApplication();
// 获取响应信息.
// application_id {
// id: 10
// cluster_timestamp: 1615446205104
// }
// maximumCapability {
// memory: 8192
// virtual_cores: 4
// resource_value_map {
// key: "memory-mb"
// value: 8192
// units: "Mi"
// type: COUNTABLE
// }
// resource_value_map {
// key: "vcores"
// value: 4
// units: ""
// type: COUNTABLE
// }
// }
final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
// 获取yarn集群资源的最大值...
// <memory:8192, vCores:4>
Resource maxRes = appResponse.getMaximumResourceCapability();
final ClusterResourceDescription freeClusterMem;
try {
// 获取空间内存大小....
// freeClusterMem :
// totalFreeMemory = 104857600
// containerLimit = 104857600
// nodeManagersFree = {int[1]@4603}
freeClusterMem = getCurrentFreeClusterResources(yarnClient);
} catch (YarnException | IOException e) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw new YarnDeploymentException(
"Could not retrieve information about free cluster resources.", e);
}
// 获取yarn最小分配的内存大小, 默认 1024MB
final int yarnMinAllocationMB =
yarnConfiguration.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
// 如果最小分配的内存资源 < 0 , 抛出异常....
if (yarnMinAllocationMB <= 0) {
throw new YarnDeploymentException(
"The minimum allocation memory "
+ "("
+ yarnMinAllocationMB
+ " MB) configured via '"
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ "' should be greater than 0.");
}
final ClusterSpecification validClusterSpecification;
try {
// 开启验证, 验证资源是否满足需求...
validClusterSpecification =
validateClusterResources(
clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);
} catch (YarnDeploymentException yde) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw yde;
}
LOG.info("Cluster specification: {}", validClusterSpecification);
// 开始获取设定的启动模式... : NORMAL
final ClusterEntrypoint.ExecutionMode executionMode =
detached
? ClusterEntrypoint.ExecutionMode.DETACHED
: ClusterEntrypoint.ExecutionMode.NORMAL;
// 设置启动模式 : internal.cluster.execution-mode =>
flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());
// 开始启动AppMaster, 获取响应信息
ApplicationReport report =
startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
validClusterSpecification);
// 输出application id, 主要用去取消任务...
// print the application id for user to cancel themselves.
if (detached) {
final ApplicationId yarnApplicationId = report.getApplicationId();
logDetachedClusterInformation(yarnApplicationId, LOG);
}
// 设置集群配置...
setClusterEntrypointInfoToConfig(report);
return () -> {
try {
// 构建反馈的集群客户端...
return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
} catch (Exception e) {
throw new RuntimeException("Error while creating RestClusterClient.", e);
}
};
}
2.5. StreamExecutionEnvironment#startAppMaster
提交任务, 启动AppMaster .
private ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification)
throws Exception {
// ------------------ Initialize the file systems -------------------------
// 初始化文件系统
org.apache.flink.core.fs.FileSystem.initialize(
configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
// 获取文件系统: LocalFileSystem
final FileSystem fs = FileSystem.get(yarnConfiguration);
// 硬编码检查GoogleHadoopFileSystem, 因为他没有复写getScheme 方法
// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
if (!fs.getClassflinkFlink 1.12.2 源码浅析 : Task 浅析
flinkFlink 1.12.2 源码浅析 : Task数据输入
flinkFlink 1.12.2 源码浅析 :Task数据输出
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析
flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint