Flink1.15源码解析--启动JobManager----Dispatcher启动
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--启动JobManager----Dispatcher启动相关的知识,希望对你有一定的参考价值。
文章目录
一、前言
从上文 Flink1.15源码解析---- DispatcherResourceManagerComponent 我们知道ResourceManagerServiceImpl 的创建及启动大概流程
接下来我们详细的梳理一下
一、DispatcherRunner 创建
// org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create
final HistoryServerArchivist historyServerArchivist =
HistoryServerArchivist.createHistoryServerArchivist(
configuration, webMonitorEndpoint, ioExecutor);
final DispatcherOperationCaches dispatcherOperationCaches =
new DispatcherOperationCaches(
configuration.get(RestOptions.ASYNC_OPERATION_STORE_DURATION));
final PartialDispatcherServices partialDispatcherServices =
new PartialDispatcherServices(
configuration,
highAvailabilityServices,
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
() ->
JobManagerMetricGroup.createJobManagerMetricGroup(
metricRegistry, hostname),
executionGraphInfoStore,
fatalErrorHandler,
historyServerArchivist,
metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
ioExecutor,
dispatcherOperationCaches);
log.debug("Starting Dispatcher.");
dispatcherRunner =
dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElectionService(),
fatalErrorHandler,
new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
通过工厂 DefaultDispatcherRunnerFactory 创建 DispatcherRunner
// org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory#createDispatcherRunner
@Override
public DispatcherRunner createDispatcherRunner(
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler,
JobPersistenceComponentFactory jobPersistenceComponentFactory,
Executor ioExecutor,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices)
throws Exception
final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =
dispatcherLeaderProcessFactoryFactory.createFactory(
jobPersistenceComponentFactory,
ioExecutor,
rpcService,
partialDispatcherServices,
fatalErrorHandler);
return DefaultDispatcherRunner.create(
leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);
1.1、LeaderElectionService 初始化
highAvailabilityServices.getDispatcherLeaderElectionService()
// org.apache.flink.runtime.highavailability.AbstractHaServices#getDispatcherLeaderElectionService
@Override
public LeaderElectionService getDispatcherLeaderElectionService()
return createLeaderElectionService(getLeaderPathForDispatcher());
highAvailabilityServices 就是 ClusterEntrypoint 初始化的HA服务, 以 ZooKeeperHaServices 为例 DefaultLeaderElectionService
@Override
protected LeaderElectionService createLeaderElectionService(String leaderPath)
return ZooKeeperUtils.createLeaderElectionService(getCuratorFramework(), leaderPath);
public static DefaultLeaderElectionService createLeaderElectionService(
final CuratorFramework client, final String path)
return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path));
以上是关于Flink1.15源码解析--启动JobManager----Dispatcher启动的主要内容,如果未能解决你的问题,请参考以下文章
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----Dispatcher启动