Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动

Posted 宝哥大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动相关的知识,希望对你有一定的参考价值。

文章目录

一、前言

从上文 Flink1.15源码解析---- DispatcherResourceManagerComponent 我们知道WebMonitorEndpoint的创建及启动

org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create

            // 构建了一个线程池用于执行 WebMonitorEndpointEndpoint 所接收到的client发送过来的请求
            final ScheduledExecutorService executor =
                    WebMonitorEndpoint.createExecutorService(
                            configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
                            configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
                            "DispatcherRestEndpoint");

            // 初始化 MetricFetcher, 默认刷新间隔是10s
            final long updateInterval =
                    configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
            final MetricFetcher metricFetcher =
                    updateInterval == 0
                            ? VoidMetricFetcher.INSTANCE
                            : MetricFetcherImpl.fromConfiguration(
                                    configuration,
                                    metricQueryServiceRetriever,
                                    dispatcherGatewayRetriever,
                                    executor);

            // 创建 三大组件之 WebMonitorEndpoint
            webMonitorEndpoint =
                    restEndpointFactory.createRestEndpoint(
                            configuration,
                            dispatcherGatewayRetriever,
                            resourceManagerGatewayRetriever,
                            blobServer,
                            executor,
                            metricFetcher,
                            highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
                            fatalErrorHandler);
            // 启动 三大组件之 WebMonitorEndpoint
            log.debug("Starting Dispatcher REST endpoint.");
            webMonitorEndpoint.start();

本文我们将详细的梳理 WebMonitorEndpoint 的构建与启动

二、WebMonitorEndpoint 构建

WebMonitorEndpoint 由 restEndpointFactory 构建, restEndpointFactory 的初始化 由 DispatcherResourceManagerComponentFactory 根据启动方式不同

接下来我们以 StandaloneSessionClusterEntrypoint 为例 看 restEndpointFactory 的初始化

2.1、restEndpointFactory 的初始化

1、StandaloneSessionClusterEntrypoint 创建 DefaultDispatcherResourceManagerComponentFactory
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint#createDispatcherResourceManagerComponentFactory

    @Override
    protected DefaultDispatcherResourceManagerComponentFactory
            createDispatcherResourceManagerComponentFactory(Configuration configuration) 
        return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
                StandaloneResourceManagerFactory.getInstance());
    

2、createSessionComponentFactory 包含三大组件工厂的创建
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#createSessionComponentFactory

    public static DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory(
            ResourceManagerFactory<?> resourceManagerFactory) 
        return new DefaultDispatcherResourceManagerComponentFactory(
                DefaultDispatcherRunnerFactory.createSessionRunner(
                        SessionDispatcherFactory.INSTANCE),
                resourceManagerFactory,
                SessionRestEndpointFactory.INSTANCE);
    

restEndpointFactory 是 SessionRestEndpointFactory.INSTANCE

2.2、createRestEndpoint 创建 WebMonitorEndpoint

RestEndpointFactory 创建 DispatcherRestEndpoint

/** @link RestEndpointFactory which creates a @link DispatcherRestEndpoint. */
public enum SessionRestEndpointFactory implements RestEndpointFactory<DispatcherGateway> 
    INSTANCE;

    @Override
    public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
            Configuration configuration,
            LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
            LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
            TransientBlobService transientBlobService,
            ScheduledExecutorService executor,
            MetricFetcher metricFetcher,
            LeaderElectionService leaderElectionService,
            FatalErrorHandler fatalErrorHandler)
            throws Exception 
        final RestHandlerConfiguration restHandlerConfiguration =
                RestHandlerConfiguration.fromConfiguration(configuration);

		// 创建 DispatcherRestEndpoint
        return new DispatcherRestEndpoint(
                dispatcherGatewayRetriever,
                configuration,
                restHandlerConfiguration,
                resourceManagerGatewayRetriever,
                transientBlobService,
                executor,
                metricFetcher,
                leaderElectionService,
                RestEndpointFactory.createExecutionGraphCache(restHandlerConfiguration),
                fatalErrorHandler);
    

创建的 DispatcherRestEndpoint 是 Dispatcher 的 REST endpoint

/** REST endpoint for the @link Dispatcher component. */
public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway> 
//......

三、WebMonitorEndpoint 启动

实际调用 org.apache.flink.runtime.rest.RestServerEndpoint#start

3.1、Router

            // 1、首先创建Router,来解析Client的请求并寻找对应的Handler
            final Router router = new Router();

3.2、注册了一堆Handler

            // 2、 注册了一堆Handler
            // 2.1、初始化 handlers
            final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
            handlers = initializeHandlers(restAddressFuture);

            // 2.2、将这些Handler进行排序,这里的排序是为了确认URL和Handler一对一的关系
            /* sort the handlers such that they are ordered the following:
             * /jobs
             * /jobs/overview
             * /jobs/:jobid
             * /jobs/:jobid/config
             * /:*
             */
            Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);
            // 2.3、 排序好后通过checkAllEndpointsAndHandlersAreUnique方法来确认唯一性
            checkAllEndpointsAndHandlersAreUnique(handlers);
            // 2.4、 注册 handlers
            handlers.forEach(handler -> registerHandler(router, handler, log));

3.3、Netty启动的相关操作

3.3.1、 ChannelInitializer 初始化

// 3.1、 ChannelInitializer 初始化
            ChannelInitializer<SocketChannel> initializer =
                    new ChannelInitializer<SocketChannel>() 

                        @Override
                        protected void initChannel(SocketChannel ch) throws ConfigurationException 
                            RouterHandler handler = new RouterHandler(router, responseHeaders);

                            // SSL should be the first handler in the pipeline
                            if (isHttpsEnabled()) 
                                ch.pipeline()
                                        .addLast(
                                                "ssl",
                                                new RedirectingSslHandler(
                                                        restAddress,
                                                        restAddressFuture,
                                                        sslHandlerFactory));
                            

                            ch.pipeline()
                                    .addLast(new HttpServerCodec())
                                    .addLast(new FileUploadHandler(uploadDir))
                                    .addLast(
                                            new FlinkHttpObjectAggregator(
                                                    maxContentLength, responseHeaders));

                            for (InboundChannelHandlerFactory factory :
                                    inboundChannelHandlerFactories) 
                                Optional<ChannelHandler> channelHandler =
                                        factory.createHandler(configuration, responseHeaders);
                                if (channelHandler.isPresent()) 
                                    ch.pipeline().addLast(channelHandler.get());
                                
                            

                            ch.pipeline()
                                    .addLast(new ChunkedWriteHandler())
                                    .addLast(handler.getName(), handler)
                                    .addLast(new PipelineErrorHandler(log, responseHeaders));
                        
                    ;

3.3.2、NioEventLoopGroup 初始化

            NioEventLoopGroup bossGroup =
                    new NioEventLoopGroup(
                            1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
            NioEventLoopGroup workerGroup =
                    new NioEventLoopGroup(
                            0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));

            bootstrap = new ServerBootstrap();
            bootstrap
                    .group(bossGroup, workerGroup)
                    .channel(NioserverSocketChannel.class)
                    .childHandler(initializer);

3.3.3、绑定 rest endpoint

            //  3.3、 Binding rest endpoint
            // 3.3.1、获取可用端口范围
            Iterator<Integer> portsIterator;
            try 
                portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
             catch (IllegalConfigurationException e) 
                throw e;
             catch (Exception e) 
                throw new IllegalArgumentException(
                        "Invalid port range definition: " + restBindPortRange);
            

            // 3.3.2、处理端口冲突 将逐一尝试端口是否可用
            int chosenPort = 0;
            while (portsIterator.hasNext()) 
                try 
                    chosenPort = portsIterator.next();
                    final ChannelFuture channel;
                    // 绑定address,port 获取 channel
                    if (restBindAddress == null) 
                        channel = bootstrap.bind(chosenPort);
                     else 
                        channel = bootstrap.bind(restBindAddress, chosenPort);
                    
                    serverChannel = channel.syncUninterruptibly().channel();
                    break;
                 catch (final Exception e) 
                    // syncUninterruptibly() throws checked exceptions via Unsafe
                    // continue if the exception is due to the port being in use, fail early
                    // otherwise
                    if (!(e instanceof java.net.BindException)) 
                        throw e;
                    
                
            

            if (serverChannel == null) 
                throw new BindException(
                        "Could not start rest endpoint on any port in port range "
                                + restBindPortRange);
            

            log.debug("Binding rest endpoint to :.", restBindAddress, chosenPort);

            final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
            final String advertisedAddress;
            if (bindAddress.getAddress().isAnyLocalAddress()) 
                advertisedAddress = this.restAddress;
             else 
                advertisedAddress = bindAddress.getAddress().getHostAddress();
            

            port = bindAddress.getPort();

            log.info("Rest endpoint listening at :", advertisedAddress, port);

            restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();

3.3.4、restAddress 启动成功

restAddressFuture.complete(restBaseUrl);

3.3.5、修改 EndPoint 状态为 RUNNING, 到这里 WebMonitorEndpoint 的 Netty 服务就启动完毕了

state = State.RUNNING;

3.4、钩子来启动子类特定的服务。


    /**
     * Hook to start sub class specific services.
     *
     * @throws Exception if an error occurred
     */
    protected abstract void startInternal() throws Exception;

我们看下子类 startInternal 的 具体实现
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#startInternal

    @Override
    public void startInternal() throws Exception 
    	// 1、 节点选举
        leaderElectionService.start(this);
        startExecutionGraphCacheCleanupTask();

        if (hasWebUI) 
            log.info("Web frontend listening at .", getRestBaseUrl());
        
    

3.4.1、 节点选举

HighAvailabilityServices 初始化, 根据 high-availability 的类型创建不同的 HighAvailabilityServices

leaderElectionService 初始化是在 WebMonitorEndpoint 创建时构建的

highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),

以 standalone 模式启动为例 创建的 StandaloneLeaderElectionService 对象

    @Override
    public LeaderElectionService getClusterRestEndpointLeaderElectionService() 
        synchronized (lock) 
            checkNotShutdown();

            return new StandaloneLeaderElectionService();
        
    

返回Flink1.15源码解析-总目录

以上是关于Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动的主要内容,如果未能解决你的问题,请参考以下文章

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析

Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动