Flink1.15源码解析--启动JobManager----ResourceManager启动
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--启动JobManager----ResourceManager启动相关的知识,希望对你有一定的参考价值。
一、前言
从上文 Flink1.15源码解析---- DispatcherResourceManagerComponent 我们知道ResourceManagerServiceImpl 的创建及启动
二、ResourceManagerServiceImpl 创建
//org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create
resourceManagerService =
ResourceManagerServiceImpl.create(
resourceManagerFactory,
configuration,
resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
metricRegistry,
hostname,
ioExecutor);
// org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl#create
public static ResourceManagerServiceImpl create(
ResourceManagerFactory<?> resourceManagerFactory,
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
MetricRegistry metricRegistry,
String hostname,
Executor ioExecutor)
throws ConfigurationException
return new ResourceManagerServiceImpl(
resourceManagerFactory,
resourceManagerFactory.createResourceManagerProcessContext(
configuration,
resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
fatalErrorHandler,
clusterInformation,
webInterfaceUrl,
metricRegistry,
hostname,
ioExecutor));
private ResourceManagerServiceImpl(
ResourceManagerFactory<?> resourceManagerFactory,
ResourceManagerProcessContext rmProcessContext)
this.resourceManagerFactory = checkNotNull(resourceManagerFactory);
this.rmProcessContext = checkNotNull(rmProcessContext);
this.leaderElectionService =
rmProcessContext
.getHighAvailabilityServices()
.getResourceManagerLeaderElectionService();
this.fatalErrorHandler = rmProcessContext.getFatalErrorHandler();
this.ioExecutor = rmProcessContext.getIoExecutor();
this.handleLeaderEventExecutor = Executors.newSingleThreadExecutor();
this.serviceTerminationFuture = new CompletableFuture<>();
this.running = false;
// ResourceManager
this.leaderResourceManager = null;
this.leaderSessionID = null;
this.previousResourceManagerTerminationFuture = FutureUtils.completedVoidFuture();
1.1、LeaderElectionService 初始化
this.leaderElectionService =
rmProcessContext
.getHighAvailabilityServices()
.getResourceManagerLeaderElectionService();
// org.apache.flink.runtime.resourcemanager.ResourceManagerProcessContext#getHighAvailabilityServices
public HighAvailabilityServices getHighAvailabilityServices()
return highAvailabilityServices;
// org.apache.flink.runtime.highavailability.AbstractHaServices#getResourceManagerLeaderElectionService
// 以 AbstractHaServices 为例, 查看子类实例
@Override
public LeaderElectionService getResourceManagerLeaderElectionService()
return createLeaderElectionService(getLeaderPathForResourceManager());
// org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices#createLeaderElectionService
@Override
protected LeaderElectionService createLeaderElectionService(String leaderPath)
return ZooKeeperUtils.createLeaderElectionService(getCuratorFramework(), leaderPath);
// org.apache.flink.runtime.util.ZooKeeperUtils#createLeaderElectionService
public static DefaultLeaderElectionService createLeaderElectionService(
final CuratorFramework client, final String path)
return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path));
highAvailabilityServices 就是 ClusterEntrypoint初始化的HA服务, 以 ZooKeeperHaServices 为例, 创建 DefaultLeaderElectionService
二、ResourceManagerServiceImpl 启动
// org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory#create
log.debug("Starting ResourceManagerService.");
resourceManagerService.start();
实际是
@Override
public void start() throws Exception
synchronized (lock)
if (running)
LOG.debug("Resource manager service has already started.");
return;
running = true;
LOG.info("Starting resource manager service.");
· // 调用 DefaultLeaderElectionService.start, 传入 ResourceManagerServiceImpl
leaderElectionService.start(this);
接下来就是选举服务了, 和 WebMonitorEndpoint选举流程一样
以上是关于Flink1.15源码解析--启动JobManager----ResourceManager启动的主要内容,如果未能解决你的问题,请参考以下文章
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----Dispatcher启动