Flink1.15源码解析---- ClusterEntrypoint
Posted 宝哥大数据[离职找工作中,大佬帮内推下]
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析---- ClusterEntrypoint相关的知识,希望对你有一定的参考价值。
文章目录
- 一、属性
- 二、初始化
- 三、startCluster
- 返回[Flink1.15源码解析-总目录](https://blog.csdn.net/wuxintdrh/article/details/127796678)
Flink集群入口点的基类。
一、属性
public static final ConfigOption<String> INTERNAL_CLUSTER_EXECUTION_MODE =
ConfigOptions.key("internal.cluster.execution-mode")
.stringType()
.defaultValue(ExecutionMode.NORMAL.toString());
protected static final Logger LOG = LoggerFactory.getLogger(ClusterEntrypoint.class);
protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;
private static final Time INITIALIZATION_SHUTDOWN_TIMEOUT = Time.seconds(30L);
/** The lock to guard startup / shutdown / manipulation methods. */
private final Object lock = new Object();
private final Configuration configuration;
private final CompletableFuture<ApplicationStatus> terminationFuture;
private final AtomicBoolean isShutDown = new AtomicBoolean(false);
@GuardedBy("lock")
private DeterminismEnvelope<ResourceID> resourceId;
/**
* 封装 Dispatcher、ResourceManager、WebMonitorEndpoint 三大组件
* Component which starts a @link Dispatcher, @link ResourceManager and @link
* WebMonitorEndpoint in the same process.
*/
@GuardedBy("lock")
private DispatcherResourceManagerComponent clusterComponent;
// 八大服务================================ start
@GuardedBy("lock")
private MetricRegistryImpl metricRegistry;
@GuardedBy("lock")
private ProcessMetricGroup processMetricGroup;
@GuardedBy("lock")
private HighAvailabilityServices haServices;
@GuardedBy("lock")
private BlobServer blobServer;
@GuardedBy("lock")
private HeartbeatServices heartbeatServices;
@GuardedBy("lock")
private RpcService commonRpcService;
@GuardedBy("lock")
private ExecutorService ioExecutor;
private ExecutionGraphInfoStore executionGraphInfoStore;
// 八大服务================================ end
private RpcSystem rpcSystem;
@GuardedBy("lock")
private DeterminismEnvelope<WorkingDirectory> workingDirectory;
private final Thread shutDownHook;
1.1、DispatcherResourceManagerComponent - 三大组件
1.2、八大服务
1.2.1、RpcService
基于Akka的RpcService实现。内部包装了ActorSystem,这个服务其实就是一个tcp的Rpc服务,端口为:6123
1.2.2、JMXService
启动一个JMXService,用于客户端连接JobManager JVM监控
1.2.3、 ExecutorService
启动一个线程池,大小为当前节点cpu核心数*4
1.2.4、HighAvailabilityServices
初始化一个基于Zookeeper的HA服务—ZookeeperHaServices ,提供对高可用性所需的所有服务的访问注册,分布式计数器和领导人选举
1.2.5、BlobServer
初始化大文件存储BlobServer服务端,所谓大文件例如上传Flink-job的jar时所依赖的一些需要一起上传的jar,或者TaskManager上传的log文件等
1.2.6、HeartbeatServices
提供心跳所需的所有服务,这包括创建心跳接收器和心跳发送者。
1.2.7、MetricRegistryImpl
启动Metric(性能监控) 相关服务,内部也是启动一个ActorSystem ,跟踪所有已注册的Metric,他作为连接MetricGroup和MetricReporter
1.2.8、ExecutionGraphInfoStore
存储执行图ExecutionGraph的可序列化形式。注意此处并不是JobGraphStore,JobGraphStore会在Dispatcher启动时启动。
二、初始化
2.1、创建 ClusterEntrypoint 对象
// 子类创建对象
public StandaloneSessionClusterEntrypoint(Configuration configuration)
super(configuration);
// 通过 super(configuration) 实例化
protected ClusterEntrypoint(Configuration configuration)
this.configuration = generateClusterConfiguration(configuration);
this.terminationFuture = new CompletableFuture<>();
if (configuration.get(JobManagerOptions.SCHEDULER_MODE) == SchedulerExecutionMode.REACTIVE
&& !supportsReactiveMode())
final String msg =
"Reactive mode is configured for an unsupported cluster type. At the moment, reactive mode is only supported by standalone application clusters (bin/standalone-job.sh).";
// log message as well, otherwise the error is only shown in the .out file of the
// cluster
LOG.error(msg);
throw new IllegalConfigurationException(msg);
shutDownHook =
ShutdownHookUtil.addShutdownHook(
() -> this.closeAsync().join(), getClass().getSimpleName(), LOG);
三、startCluster
3.1、进入 satartCluster
- 1、加载配置
- 2、安装安全模块 详细内容见 Flink1.15源码解析–安全模块及安全上下文
- 3、通过安全模块执行启动
public void startCluster() throws ClusterEntrypointException
LOG.info("Starting .", getClass().getSimpleName());
try
FlinkSecurityManager.setFromConfiguration(configuration);
PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(configuration);
configureFileSystems(configuration, pluginManager);
SecurityContext securityContext = installSecurityContext(configuration);
ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
// 通过安全上下文启动
securityContext.runSecured(
(Callable<Void>)
() ->
runCluster(configuration, pluginManager);
return null;
);
catch (Throwable t)
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
try
// clean up any partial state
shutDownAsync(
ApplicationStatus.FAILED,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
ExceptionUtils.stringifyException(strippedThrowable),
false)
.get(
INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(),
TimeUnit.MILLISECONDS);
catch (InterruptedException | ExecutionException | TimeoutException e)
strippedThrowable.addSuppressed(e);
throw new ClusterEntrypointException(
String.format(
"Failed to initialize the cluster entrypoint %s.",
getClass().getSimpleName()),
strippedThrowable);
通过安全上下文启动 调用 runCluster 这个过程 和 flink run 提交job 有些类似
接下来我们看 runCluster
3.2、runCluster
runCluster 主要完成下面几个功能:
- 1、初始化服务
- 2、将jobmanager的host、port加入配置
- 3、 创建工厂
- 4、通过工厂创建 clusterComponent (三大组件封装),并启动
- 5、结束返回回调消息
private void runCluster(Configuration configuration, PluginManager pluginManager)
throws Exception
synchronized (lock)
// 1、初始化服务
initializeServices(configuration, pluginManager);
// write host information into configuration
// 2、将jobmanager的host、port加入配置
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
// 3、 创建工厂
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
createDispatcherResourceManagerComponentFactory(configuration);
// 4、通过工厂创建 clusterComponent (三大组件封装),并启动
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
resourceId.unwrap(),
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
executionGraphInfoStore,
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
this);
// 5、结束,
clusterComponent
.getShutDownFuture()
.whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) ->
if (throwable != null)
shutDownAsync(
ApplicationStatus.UNKNOWN,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
ExceptionUtils.stringifyException(throwable),
false);
else
// This is the general shutdown path. If a separate more
// specific shutdown was
// already triggered, this will do nothing
shutDownAsync(
applicationStatus,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
null,
true);
);
3.2.1、initializeServices 初始化服务
protected void initializeServices(Configuration configuration, PluginManager pluginManager)
throws Exception
LOG.info("Initializing cluster services.");
synchronized (lock)
// 1、生产 jobmanager 的 resourceId, 如果没有配置就随机生成一个
resourceId =
configuration
.getOptional(JobManagerOptions.JOB_MANAGER_RESOURCE_ID)
.map(
value ->
DeterminismEnvelope.deterministicValue(
new ResourceID(value)))
.orElseGet(
() ->
DeterminismEnvelope.nondeterministicValue(
ResourceID.generate()));
LOG.debug(
"Initialize cluster entrypoint with resource id .",
getClass().getSimpleName(),
resourceId);
// 2、创建 jobmanager 的工作目录
workingDirectory =
ClusterEntrypointUtils.createJobManagerWorkingDirectory(
configuration, resourceId);
// Using working directory: WorkingDirectory(/tmp/jm_d5e86014d966cefbbac51289de71fecb).
LOG.info("Using working directory: .", workingDirectory);
// 3、 创建 rpcSystem
rpcSystem = RpcSystem.load(configuration);
// 4、 初始化 rpcService
commonRpcService =
RpcUtils.createRemoteRpcService(
rpcSystem,
configuration,
configuration.getString(JobManagerOptions.ADDRESS),
getRPCPortRange(configuration),
configuration.getString(JobManagerOptions.BIND_HOST),
configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));
// 5、启动了一个JMXService,用于客户端连接JobManager JVM监控
JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
// update the configuration used to create the high availability services
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
// 6、初始化IO线程池,大小为当前节点cpu核心数*4
ioExecutor =
Executors.newFixedThreadPool(
ClusterEntrypointUtils.getPoolSize(configuration),
new ExecutorThreadFactory("cluster-io"));
// 7、创建 HA 服务
haServices = createHaServices(configuration, ioExecutor, rpcSystem);
// 8、初始化 初始化大文件存储BlobServer服务端,所谓大文件例如上传Flink-job的jar时所依赖的一些需要一起上传的jar,或者TaskManager上传的log文件等
blobServer =
BlobUtils.createBlobServer(
configuration,
Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),
haServices.createBlobStore());
blobServer.start();
configuration.setString(BlobServerOptions.PORT, String.valueOf(blobServer.getPort()));
// 9、心跳服务
heartbeatServices = createHeartbeatServices(configuration);
// 10、启动Metric(性能监控) 相关服务,
metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);
final RpcService metricQueryServiceRpcService =
MetricUtils.startRemoteMetricsRpcService(
configuration,
commonRpcService.getAddress(),
configuration.getString(JobManagerOptions.BIND_HOST),
rpcSystem);
metricRegistry.startQueryService(metricQueryServiceRpcService, null);
// 11、 processMetricGroup
final String hostname = RpcUtils.getHostname(commonRpcService);
processMetricGroup =
MetricUtils.instantiateProcessMetricGroup(
metricRegistry,
hostname,
ConfigurationUtils.getSystemResourceMetricsProbingInterval(
configuration));
// 12、 初始化一个用来存储ExecutionGraph的Store, 根据子类实现 创建 FileExecutionGraphInfoStore or MemoryExecutionGraphInfoStore
executionGraphInfoStore =
createSerializableExecutionGraphStore(
configuration, commonRpcService.getScheduledExecutor());
3.2.1.7、创建 HA 服务
// 7、创建 HA 服务
haServices = createHaServices(configuration, ioExecutor, rpcSystem);
org.apache.flink.runtime.entrypoint.ClusterEntrypoint#createHaServices
protected HighAvailabilityServices createHaServices(
Configuration configuration, Executor executor, RpcSystemUtils rpcSystemUtils)
throws Exception
return HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
executor,
AddressResolution.NO_ADDRESS_RESOLUTION,
rpcSystemUtils,
this);
根据 high-availability
的类型创建不同的 HighAvailabilityServices
public static HighAvailabilityServices createHighAvailabilityServices(
Configuration configuration,
Executor executor,
AddressResolution addressResolution,
RpcSystemUtils rpcSystemUtils,
FatalErrorHandler fatalErrorHandler)
throws Exception
HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);
switch (highAvailabilityMode)
case NONE:
final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);
final String resourceManagerRpcUrl =
rpcSystemUtils.getRpcUrl(
hostnamePort.f0,
hostnamePort.f1,
RpcServiceUtils.createWildcardName(
ResourceManager.RESOURCE_MANAGER_NAME),
addressResolution,
configuration);
final String dispatcherRpcUrl =
rpcSystemUtils.getRpcUrl(
hostnamePort.f0,
hostnamePort.f1,
RpcServiceUtils.createWildcardName(Dispatcher.DISPATCHER_NAME),
addressResolution,
configuration);
final String webMonitorAddress =
getWebMonitorAddress(configuration, addressResolution);
return new StandaloneHaServices(
resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress);
case ZOOKEEPER:
return createZooKeeperHaServices(configuration, executor, fatalErrorHandler);
case FAC以上是关于Flink1.15源码解析---- ClusterEntrypoint的主要内容,如果未能解决你的问题,请参考以下文章
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----ResourceManager启动