Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动
Posted 宝哥大数据[离职找工作中,大佬帮内推下]
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动相关的知识,希望对你有一定的参考价值。
文章目录
- 一、前言
- 二、WebMonitorEndpoint 构建
- 三、WebMonitorEndpoint 启动
- 四、总结
- 返回[Flink1.15源码解析-总目录](https://blog.csdn.net/wuxintdrh/article/details/127796678)
一、前言
从上文 Flink1.15源码解析---- DispatcherResourceManagerComponent 我们知道WebMonitorEndpoint的创建及启动
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create
// 构建了一个线程池用于执行 WebMonitorEndpointEndpoint 所接收到的client发送过来的请求
final ScheduledExecutorService executor =
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");
// 初始化 MetricFetcher, 默认刷新间隔是10s
final long updateInterval =
configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
final MetricFetcher metricFetcher =
updateInterval == 0
? VoidMetricFetcher.INSTANCE
: MetricFetcherImpl.fromConfiguration(
configuration,
metricQueryServiceRetriever,
dispatcherGatewayRetriever,
executor);
// 创建 三大组件之 WebMonitorEndpoint
webMonitorEndpoint =
restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
fatalErrorHandler);
// 启动 三大组件之 WebMonitorEndpoint
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();
本文我们将详细的梳理 WebMonitorEndpoint 的构建与启动
二、WebMonitorEndpoint 构建
WebMonitorEndpoint 由 restEndpointFactory 构建, restEndpointFactory 的初始化 由 DispatcherResourceManagerComponentFactory 根据启动方式不同
接下来我们以 StandaloneSessionClusterEntrypoint 为例 看 restEndpointFactory 的初始化
2.1、restEndpointFactory 的初始化
1、StandaloneSessionClusterEntrypoint 创建 DefaultDispatcherResourceManagerComponentFactory
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint#createDispatcherResourceManagerComponentFactory
@Override
protected DefaultDispatcherResourceManagerComponentFactory
createDispatcherResourceManagerComponentFactory(Configuration configuration)
return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
StandaloneResourceManagerFactory.getInstance());
2、createSessionComponentFactory 包含三大组件工厂的创建
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#createSessionComponentFactory
public static DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory(
ResourceManagerFactory<?> resourceManagerFactory)
return new DefaultDispatcherResourceManagerComponentFactory(
DefaultDispatcherRunnerFactory.createSessionRunner(
SessionDispatcherFactory.INSTANCE),
resourceManagerFactory,
SessionRestEndpointFactory.INSTANCE);
restEndpointFactory 是 SessionRestEndpointFactory.INSTANCE
2.2、createRestEndpoint 创建 WebMonitorEndpoint
RestEndpointFactory 创建 DispatcherRestEndpoint
/** @link RestEndpointFactory which creates a @link DispatcherRestEndpoint. */
public enum SessionRestEndpointFactory implements RestEndpointFactory<DispatcherGateway>
INSTANCE;
@Override
public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
Configuration configuration,
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
TransientBlobService transientBlobService,
ScheduledExecutorService executor,
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler)
throws Exception
final RestHandlerConfiguration restHandlerConfiguration =
RestHandlerConfiguration.fromConfiguration(configuration);
// 创建 DispatcherRestEndpoint
return new DispatcherRestEndpoint(
dispatcherGatewayRetriever,
configuration,
restHandlerConfiguration,
resourceManagerGatewayRetriever,
transientBlobService,
executor,
metricFetcher,
leaderElectionService,
RestEndpointFactory.createExecutionGraphCache(restHandlerConfiguration),
fatalErrorHandler);
创建的 DispatcherRestEndpoint 是 Dispatcher 的 REST endpoint
/** REST endpoint for the @link Dispatcher component. */
public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway>
//......
三、WebMonitorEndpoint 启动
实际调用 org.apache.flink.runtime.rest.RestServerEndpoint#start
3.1、Router
// 1、首先创建Router,来解析Client的请求并寻找对应的Handler
final Router router = new Router();
3.2、注册了一堆Handler
// 2、 注册了一堆Handler
// 2.1、初始化 handlers
final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
handlers = initializeHandlers(restAddressFuture);
// 2.2、将这些Handler进行排序,这里的排序是为了确认URL和Handler一对一的关系
/* sort the handlers such that they are ordered the following:
* /jobs
* /jobs/overview
* /jobs/:jobid
* /jobs/:jobid/config
* /:*
*/
Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);
// 2.3、 排序好后通过checkAllEndpointsAndHandlersAreUnique方法来确认唯一性
checkAllEndpointsAndHandlersAreUnique(handlers);
// 2.4、 注册 handlers
handlers.forEach(handler -> registerHandler(router, handler, log));
3.3、Netty启动的相关操作
3.3.1、 ChannelInitializer 初始化
// 3.1、 ChannelInitializer 初始化
ChannelInitializer<SocketChannel> initializer =
new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel ch) throws ConfigurationException
RouterHandler handler = new RouterHandler(router, responseHeaders);
// SSL should be the first handler in the pipeline
if (isHttpsEnabled())
ch.pipeline()
.addLast(
"ssl",
new RedirectingSslHandler(
restAddress,
restAddressFuture,
sslHandlerFactory));
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new FileUploadHandler(uploadDir))
.addLast(
new FlinkHttpObjectAggregator(
maxContentLength, responseHeaders));
for (InboundChannelHandlerFactory factory :
inboundChannelHandlerFactories)
Optional<ChannelHandler> channelHandler =
factory.createHandler(configuration, responseHeaders);
if (channelHandler.isPresent())
ch.pipeline().addLast(channelHandler.get());
ch.pipeline()
.addLast(new ChunkedWriteHandler())
.addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(log, responseHeaders));
;
3.3.2、NioEventLoopGroup 初始化
NioEventLoopGroup bossGroup =
new NioEventLoopGroup(
1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
NioEventLoopGroup workerGroup =
new NioEventLoopGroup(
0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));
bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioserverSocketChannel.class)
.childHandler(initializer);
3.3.3、绑定 rest endpoint
// 3.3、 Binding rest endpoint
// 3.3.1、获取可用端口范围
Iterator<Integer> portsIterator;
try
portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
catch (IllegalConfigurationException e)
throw e;
catch (Exception e)
throw new IllegalArgumentException(
"Invalid port range definition: " + restBindPortRange);
// 3.3.2、处理端口冲突 将逐一尝试端口是否可用
int chosenPort = 0;
while (portsIterator.hasNext())
try
chosenPort = portsIterator.next();
final ChannelFuture channel;
// 绑定address,port 获取 channel
if (restBindAddress == null)
channel = bootstrap.bind(chosenPort);
else
channel = bootstrap.bind(restBindAddress, chosenPort);
serverChannel = channel.syncUninterruptibly().channel();
break;
catch (final Exception e)
// syncUninterruptibly() throws checked exceptions via Unsafe
// continue if the exception is due to the port being in use, fail early
// otherwise
if (!(e instanceof java.net.BindException))
throw e;
if (serverChannel == null)
throw new BindException(
"Could not start rest endpoint on any port in port range "
+ restBindPortRange);
log.debug("Binding rest endpoint to :.", restBindAddress, chosenPort);
final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
final String advertisedAddress;
if (bindAddress.getAddress().isAnyLocalAddress())
advertisedAddress = this.restAddress;
else
advertisedAddress = bindAddress.getAddress().getHostAddress();
port = bindAddress.getPort();
log.info("Rest endpoint listening at :", advertisedAddress, port);
restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();
3.3.4、restAddress 启动成功
restAddressFuture.complete(restBaseUrl);
3.3.5、修改 EndPoint 状态为 RUNNING, 到这里 WebMonitorEndpoint 的 Netty 服务就启动完毕了
state = State.RUNNING;
3.4、钩子来启动子类特定的服务。
/**
* Hook to start sub class specific services.
*
* @throws Exception if an error occurred
*/
protected abstract void startInternal() throws Exception;
我们看下子类 startInternal 的 具体实现
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#startInternal
@Override
public void startInternal() throws Exception
// 1、 节点选举
leaderElectionService.start(this);
startExecutionGraphCacheCleanupTask();
if (hasWebUI)
log.info("Web frontend listening at .", getRestBaseUrl());
3.4.1、 节点选举
HighAvailabilityServices 初始化, 根据 high-availability
的类型创建不同的 HighAvailabilityServices
leaderElectionService 初始化是在 WebMonitorEndpoint 创建时构建的。
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
3.4.1.1、以 standalone 模式启动为例 创建的 StandaloneLeaderElectionService 对象
@Override
public LeaderElectionService getClusterRestEndpointLeaderElectionService()
synchronized (lock)
checkNotShutdown();
return new StandaloneLeaderElectionService();
节点选举, 直接将 contender 设置为领导者, 此处的 contender 就是 WebMonitorEndpoint
@Override
public void start(LeaderContender newContender) throws Exception
if (contender != null)
// Service was already started
throw new IllegalArgumentException(
"Leader election service cannot be started multiple times.");
contender = Preconditions.checkNotNull(newContender);
// directly grant leadership to the given contender
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
3.4.1.2、以 zookeeper HA 模式启动为例 创建的 DefaultLeaderElectionService对象
org.apache.flink.runtime.highavailability.AbstractHaServices#getClusterRestEndpointLeaderElectionService
@Override
public LeaderElectionService getClusterRestEndpointLeaderElectionService()
// 由子类实现 创建 选举leader服务
return createLeaderElectionService(getLeaderPathForRestServer());
子类实现
//org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices#createLeaderElectionService
@Override
protected LeaderElectionService createLeaderElectionService(String leaderPath)
return ZooKeeperUtils.createLeaderElectionService(getCuratorFramework(), leaderPath);
// 创建 DefaultLeaderElectionService
// org.apache.flink.runtime.util.ZooKeeperUtils#createLeaderElectionService
public static DefaultLeaderElectionService createLeaderElectionService(
final CuratorFramework client, final String path)
return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path));
DefaultLeaderElectionService 启动节点选举, 此处传入的 contender 就是 WebMonitorEndpoint
Flink的选举使用的是Curator框架,节点的选举针对每一个参选对象,会创建一个选举驱动leaderElectionDriver,在完成选举之后,会回调两个方法,如果选举成功会回调isLeader方法,如果竞选失败则回调notLeader方法。
// org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#start
@Override
public final void start(LeaderContender contender) throws Exception
checkNotNull(contender, "Contender must not be null.");
Preconditions.checkState(leaderContender == null, "Contender was already set.");
synchronized (lock)
running = true;
/*
在WebMonitorEndpoint中调用时,此contender为DispatcherRestEndPoint
在ResourceManager中调用时,contender为ResourceManager
在DispatcherRunner中调用时,contender为DispatcherRunner
*/
leaderContender = contender;
// 针对每一个参选对象,会创建一个选举驱动leaderElectionDriver
leaderElectionDriver =
leaderElectionDriverFactory.createLeaderElectionDriver(
this,
new LeaderElectionFatalErrorHandler(),
leaderContender.getDescription());
LOG.info("Starting DefaultLeaderElectionService with .", leaderElectionDriver);
Flink的选举使用的是Curator框架,节点的选举针对每一个参选对象,会创建一个选举驱动leaderElectionDriver,在完成选举之后,会回调两个方法,如果选举成功会回调isLeader方法,如果竞选失败则回调notLeader方法。
ZooKeeperLeaderElectionDriverFactory 创建 ZooKeeperLeaderElectionDriver, LeaderElectionDriver负责执行领导选举和存储
领导信息。
// org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverFactory#createLeaderElectionDriver
@Override
public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
LeaderElectionEventHandler leaderEventHandler, // DefaultLeaderElectionService对象
FatalErrorHandler fatalErrorHandler, // new LeaderElectionFatalErrorHandler()
String leaderContenderDescription)
throws Exception
return new ZooKeeperLeaderElectionDriver(
client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);
public ZooKeeperLeaderElectionDriver(
CuratorFramework client,
String path,
LeaderElectionEventHandler leaderElectionEventHandler, // 传入的是 DefaultLeaderElectionService
FatalErrorHandler fatalErrorHandler,
String leaderContenderDescription)
throws Exception
checkNotNull(path);
this.client = checkNotNull(client);
this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path);
this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.leaderContenderDescription = checkNotNull(leaderContenderDescription);
leaderLatchPath = ZooKeeperUtils.generateLeaderLatchPath(path);
leaderLatch = new LeaderLatch(client, leaderLatchPath);
this.cache =
ZooKeeperUtils.createTreeCache(
client以上是关于Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动的主要内容,如果未能解决你的问题,请参考以下文章
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----Dispatcher启动