Flink1.15源码解析---- DispatcherResourceManagerComponent
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析---- DispatcherResourceManagerComponent相关的知识,希望对你有一定的参考价值。
文章目录
- 一、前言
- 二、DispatcherResourceManagerComponentFactory
- 三、创建 clusterComponent (三大组件封装),并启动
- 3.1、初始化监控
- 3.2、构建了一个线程池用于执行 WebMonitorEndpointEndpoint 所接收到的client发送过来的请求
- 3.3、初始化 MetricFetcher, 默认刷新间隔是10s
- 3.4、创建并启动三大组件 - WebMonitorEndpoint
- 3.5、创建三大组件 - ResourceManagerService
- 3.6、构建了一个DispatcherRunner,注意不是Dispatcher,Dispatcher的构建和启动是在DispatcherRunner内部实现的
- 3.7、启动三大组件 - ResourceManagerService
- 3.8、启动 resourceManager、 dispatcher 的监控
- 3.9、创建 DispatcherResourceManagerComponent
- 返回[Flink1.15源码解析-总目录](https://blog.csdn.net/wuxintdrh/article/details/127796678)
该对象封装着 Dispatcher、ResourceManager、WebMonitorEndpoint的三大组件
一、前言
从 Flink1.15源码解析---- ClusterEntrypoint 知道 DispatcherResourceManagerComponent 是由 DispatcherResourceManagerComponentFactory 创建,并初始化启动三大组件
org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runCluster(Configuration configuration, PluginManager pluginManager)
// 3、 创建工厂, 此处有子类实现 createDispatcherResourceManagerComponentFactory
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
createDispatcherResourceManagerComponentFactory(configuration);
// 4、通过工厂创建 clusterComponent (三大组件封装),并启动
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
resourceId.unwrap(),
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
executionGraphInfoStore,
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
this);
// 5、结束,
clusterComponent
.getShutDownFuture()
.whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) ->
if (throwable != null)
shutDownAsync(
ApplicationStatus.UNKNOWN,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
ExceptionUtils.stringifyException(throwable),
false);
else
// This is the general shutdown path. If a separate more
// specific shutdown was
// already triggered, this will do nothing
shutDownAsync(
applicationStatus,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
null,
true);
);
二、DispatcherResourceManagerComponentFactory
DispatcherResourceManagerComponentFactory 只有一个 实现类 DefaultDispatcherResourceManagerComponentFactory
org.apache.flink.runtime.entrypointcreateDispatcherResourceManagerComponentFactory#createDispatcherResourceManagerComponentFactory
// 3、 创建工厂, 此处有子类实现 createDispatcherResourceManagerComponentFactory
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
createDispatcherResourceManagerComponentFactory(configuration);
createDispatcherResourceManagerComponentFactory 根据ClusterEntrypoint 子类实现不同, DefaultDispatcherResourceManagerComponentFactory 提供不同的创建方法。
2.1、提供三种创建 DefaultDispatcherResourceManagerComponentFactory 的方法
对应三大组件工厂的初始化
// 三大组件的工厂
@Nonnull private final DispatcherRunnerFactory dispatcherRunnerFactory;
@Nonnull private final ResourceManagerFactory<?> resourceManagerFactory;
@Nonnull private final RestEndpointFactory<?> restEndpointFactory;
// application 模式
public DefaultDispatcherResourceManagerComponentFactory( // 三大组件对应工厂
@Nonnull DispatcherRunnerFactory dispatcherRunnerFactory,
@Nonnull ResourceManagerFactory<?> resourceManagerFactory,
@Nonnull RestEndpointFactory<?> restEndpointFactory)
this.dispatcherRunnerFactory = dispatcherRunnerFactory;
this.resourceManagerFactory = resourceManagerFactory;
this.restEndpointFactory = restEndpointFactory;
// session 模式
public static DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory(
ResourceManagerFactory<?> resourceManagerFactory)
return new DefaultDispatcherResourceManagerComponentFactory(
DefaultDispatcherRunnerFactory.createSessionRunner(
SessionDispatcherFactory.INSTANCE),
resourceManagerFactory,
SessionRestEndpointFactory.INSTANCE);
// pre-job模式
public static DefaultDispatcherResourceManagerComponentFactory createJobComponentFactory(
ResourceManagerFactory<?> resourceManagerFactory, JobGraphRetriever jobGraphRetriever)
return new DefaultDispatcherResourceManagerComponentFactory(
DefaultDispatcherRunnerFactory.createJobRunner(jobGraphRetriever),
resourceManagerFactory,
JobRestEndpointFactory.INSTANCE);
在 createDispatcherResourceManagerComponentFactory 初始化了三大核心组件工厂实例。
三、创建 clusterComponent (三大组件封装),并启动
// 4、通过工厂创建 clusterComponent (三大组件封装),并启动
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
resourceId.unwrap(),
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
executionGraphInfoStore,
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
this);
3.1、初始化监控
// 监控 Dispatcher
dispatcherLeaderRetrievalService =
highAvailabilityServices.getDispatcherLeaderRetriever();
// 监控 ResourceManager
resourceManagerRetrievalService =
highAvailabilityServices.getResourceManagerLeaderRetriever();
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =
new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
new ExponentialBackoffRetryStrategy(
12, Duration.ofMillis(10), Duration.ofMillis(50)));
final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =
new RpcGatewayRetriever<>(
rpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
new ExponentialBackoffRetryStrategy(
12, Duration.ofMillis(10), Duration.ofMillis(50)));
3.2、构建了一个线程池用于执行 WebMonitorEndpointEndpoint 所接收到的client发送过来的请求
final ScheduledExecutorService executor =
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");
3.3、初始化 MetricFetcher, 默认刷新间隔是10s
final long updateInterval =
configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
final MetricFetcher metricFetcher =
updateInterval == 0
? VoidMetricFetcher.INSTANCE
: MetricFetcherImpl.fromConfiguration(
configuration,
metricQueryServiceRetriever,
dispatcherGatewayRetriever,
executor);
3.4、创建并启动三大组件 - WebMonitorEndpoint
webMonitorEndpoint =
restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
fatalErrorHandler);
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();
3.5、创建三大组件 - ResourceManagerService
final String hostname = RpcUtils.getHostname(rpcService);
// 创建 三大组件-resourceManagerService
resourceManagerService =
ResourceManagerServiceImpl.create(
resourceManagerFactory,
configuration,
resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
metricRegistry,
hostname,
ioExecutor);
final HistoryServerArchivist historyServerArchivist =
HistoryServerArchivist.createHistoryServerArchivist(
configuration, webMonitorEndpoint, ioExecutor);
final DispatcherOperationCaches dispatcherOperationCaches =
new DispatcherOperationCaches(
configuration.get(RestOptions.ASYNC_OPERATION_STORE_DURATION));
final PartialDispatcherServices partialDispatcherServices =
new PartialDispatcherServices(
configuration,
highAvailabilityServices,
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
() ->
JobManagerMetricGroup.createJobManagerMetricGroup(
metricRegistry, hostname),
executionGraphInfoStore,
fatalErrorHandler,
historyServerArchivist,
metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
ioExecutor,
dispatcherOperationCaches);
log.debug("Starting Dispatcher.");
dispatcherRunner =
dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElectionService(),
fatalErrorHandler,
new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
log.debug("Starting ResourceManagerService.");
// 启动 resourceManagerService
resourceManagerService.start();
3.6、构建了一个DispatcherRunner,注意不是Dispatcher,Dispatcher的构建和启动是在DispatcherRunner内部实现的
final HistoryServerArchivist historyServerArchivist =
HistoryServerArchivist.createHistoryServerArchivist(
configuration, webMonitorEndpoint, ioExecutor);
final DispatcherOperationCaches dispatcherOperationCaches =
new DispatcherOperationCaches(
configuration.get(RestOptions.ASYNC_OPERATION_STORE_DURATION));
final PartialDispatcherServices partialDispatcherServices =
new PartialDispatcherServices(
configuration,
highAvailabilityServices,
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
() ->
JobManagerMetricGroup.createJobManagerMetricGroup(
metricRegistry, hostname),
executionGraphInfoStore,
fatalErrorHandler,
historyServerArchivist,
metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
ioExecutor,
dispatcherOperationCaches);
log.debug("Starting Dispatcher.");
dispatcherRunner =
dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElectionService(),
fatalErrorHandler,
new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
3.7、启动三大组件 - ResourceManagerService
log.debug("Starting ResourceManagerService.");
resourceManagerService.start();
3.8、启动 resourceManager、 dispatcher 的监控
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
3.9、创建 DispatcherResourceManagerComponent
return new DispatcherResourceManagerComponent(
dispatcherRunner,
resourceManagerService,
dispatcherLeaderRetrievalService,
resourceManagerRetrievalService,
webMonitorEndpoint,
fatalErrorHandler,
dispatcherOperationCaches);
到此为止,主节点(逻辑JobManager)已启动完毕
。。。。。。
到这里你们可能要问了,ResourceManager、WebMonitorEndpoint、Dispatcher具体的启动是怎么做的呢,由于涉及到的代码量太大,我在这里拆分为三章来分别解析这三大核心组件的构建过程。在下一章,我们就先来看看WebMonitorEndpoint是如何启动的!
返回Flink1.15源码解析-总目录
以上是关于Flink1.15源码解析---- DispatcherResourceManagerComponent的主要内容,如果未能解决你的问题,请参考以下文章
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----Dispatcher启动