Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动

Posted 宝哥大数据[离职找工作中,大佬帮内推下]

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动相关的知识,希望对你有一定的参考价值。

文章目录

一、前言

从上文 Flink1.15源码解析---- DispatcherResourceManagerComponent 我们知道WebMonitorEndpoint的创建及启动

org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create

            // 构建了一个线程池用于执行 WebMonitorEndpointEndpoint 所接收到的client发送过来的请求
            final ScheduledExecutorService executor =
                    WebMonitorEndpoint.createExecutorService(
                            configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
                            configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
                            "DispatcherRestEndpoint");

            // 初始化 MetricFetcher, 默认刷新间隔是10s
            final long updateInterval =
                    configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
            final MetricFetcher metricFetcher =
                    updateInterval == 0
                            ? VoidMetricFetcher.INSTANCE
                            : MetricFetcherImpl.fromConfiguration(
                                    configuration,
                                    metricQueryServiceRetriever,
                                    dispatcherGatewayRetriever,
                                    executor);

            // 创建 三大组件之 WebMonitorEndpoint
            webMonitorEndpoint =
                    restEndpointFactory.createRestEndpoint(
                            configuration,
                            dispatcherGatewayRetriever,
                            resourceManagerGatewayRetriever,
                            blobServer,
                            executor,
                            metricFetcher,
                            highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
                            fatalErrorHandler);
            // 启动 三大组件之 WebMonitorEndpoint
            log.debug("Starting Dispatcher REST endpoint.");
            webMonitorEndpoint.start();

本文我们将详细的梳理 WebMonitorEndpoint 的构建与启动

二、WebMonitorEndpoint 构建

WebMonitorEndpoint 由 restEndpointFactory 构建, restEndpointFactory 的初始化 由 DispatcherResourceManagerComponentFactory 根据启动方式不同

接下来我们以 StandaloneSessionClusterEntrypoint 为例 看 restEndpointFactory 的初始化

2.1、restEndpointFactory 的初始化

1、StandaloneSessionClusterEntrypoint 创建 DefaultDispatcherResourceManagerComponentFactory
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint#createDispatcherResourceManagerComponentFactory

    @Override
    protected DefaultDispatcherResourceManagerComponentFactory
            createDispatcherResourceManagerComponentFactory(Configuration configuration) 
        return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
                StandaloneResourceManagerFactory.getInstance());
    

2、createSessionComponentFactory 包含三大组件工厂的创建
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#createSessionComponentFactory

    public static DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory(
            ResourceManagerFactory<?> resourceManagerFactory) 
        return new DefaultDispatcherResourceManagerComponentFactory(
                DefaultDispatcherRunnerFactory.createSessionRunner(
                        SessionDispatcherFactory.INSTANCE),
                resourceManagerFactory,
                SessionRestEndpointFactory.INSTANCE);
    

restEndpointFactory 是 SessionRestEndpointFactory.INSTANCE

2.2、createRestEndpoint 创建 WebMonitorEndpoint

RestEndpointFactory 创建 DispatcherRestEndpoint

/** @link RestEndpointFactory which creates a @link DispatcherRestEndpoint. */
public enum SessionRestEndpointFactory implements RestEndpointFactory<DispatcherGateway> 
    INSTANCE;

    @Override
    public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
            Configuration configuration,
            LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
            LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
            TransientBlobService transientBlobService,
            ScheduledExecutorService executor,
            MetricFetcher metricFetcher,
            LeaderElectionService leaderElectionService,
            FatalErrorHandler fatalErrorHandler)
            throws Exception 
        final RestHandlerConfiguration restHandlerConfiguration =
                RestHandlerConfiguration.fromConfiguration(configuration);

		// 创建 DispatcherRestEndpoint
        return new DispatcherRestEndpoint(
                dispatcherGatewayRetriever,
                configuration,
                restHandlerConfiguration,
                resourceManagerGatewayRetriever,
                transientBlobService,
                executor,
                metricFetcher,
                leaderElectionService,
                RestEndpointFactory.createExecutionGraphCache(restHandlerConfiguration),
                fatalErrorHandler);
    

创建的 DispatcherRestEndpoint 是 Dispatcher 的 REST endpoint

/** REST endpoint for the @link Dispatcher component. */
public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway> 
//......

三、WebMonitorEndpoint 启动

实际调用 org.apache.flink.runtime.rest.RestServerEndpoint#start

3.1、Router

            // 1、首先创建Router,来解析Client的请求并寻找对应的Handler
            final Router router = new Router();

3.2、注册了一堆Handler

            // 2、 注册了一堆Handler
            // 2.1、初始化 handlers
            final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
            handlers = initializeHandlers(restAddressFuture);

            // 2.2、将这些Handler进行排序,这里的排序是为了确认URL和Handler一对一的关系
            /* sort the handlers such that they are ordered the following:
             * /jobs
             * /jobs/overview
             * /jobs/:jobid
             * /jobs/:jobid/config
             * /:*
             */
            Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);
            // 2.3、 排序好后通过checkAllEndpointsAndHandlersAreUnique方法来确认唯一性
            checkAllEndpointsAndHandlersAreUnique(handlers);
            // 2.4、 注册 handlers
            handlers.forEach(handler -> registerHandler(router, handler, log));

3.3、Netty启动的相关操作

3.3.1、 ChannelInitializer 初始化

// 3.1、 ChannelInitializer 初始化
            ChannelInitializer<SocketChannel> initializer =
                    new ChannelInitializer<SocketChannel>() 

                        @Override
                        protected void initChannel(SocketChannel ch) throws ConfigurationException 
                            RouterHandler handler = new RouterHandler(router, responseHeaders);

                            // SSL should be the first handler in the pipeline
                            if (isHttpsEnabled()) 
                                ch.pipeline()
                                        .addLast(
                                                "ssl",
                                                new RedirectingSslHandler(
                                                        restAddress,
                                                        restAddressFuture,
                                                        sslHandlerFactory));
                            

                            ch.pipeline()
                                    .addLast(new HttpServerCodec())
                                    .addLast(new FileUploadHandler(uploadDir))
                                    .addLast(
                                            new FlinkHttpObjectAggregator(
                                                    maxContentLength, responseHeaders));

                            for (InboundChannelHandlerFactory factory :
                                    inboundChannelHandlerFactories) 
                                Optional<ChannelHandler> channelHandler =
                                        factory.createHandler(configuration, responseHeaders);
                                if (channelHandler.isPresent()) 
                                    ch.pipeline().addLast(channelHandler.get());
                                
                            

                            ch.pipeline()
                                    .addLast(new ChunkedWriteHandler())
                                    .addLast(handler.getName(), handler)
                                    .addLast(new PipelineErrorHandler(log, responseHeaders));
                        
                    ;

3.3.2、NioEventLoopGroup 初始化

            NioEventLoopGroup bossGroup =
                    new NioEventLoopGroup(
                            1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
            NioEventLoopGroup workerGroup =
                    new NioEventLoopGroup(
                            0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));

            bootstrap = new ServerBootstrap();
            bootstrap
                    .group(bossGroup, workerGroup)
                    .channel(NioserverSocketChannel.class)
                    .childHandler(initializer);

3.3.3、绑定 rest endpoint

            //  3.3、 Binding rest endpoint
            // 3.3.1、获取可用端口范围
            Iterator<Integer> portsIterator;
            try 
                portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
             catch (IllegalConfigurationException e) 
                throw e;
             catch (Exception e) 
                throw new IllegalArgumentException(
                        "Invalid port range definition: " + restBindPortRange);
            

            // 3.3.2、处理端口冲突 将逐一尝试端口是否可用
            int chosenPort = 0;
            while (portsIterator.hasNext()) 
                try 
                    chosenPort = portsIterator.next();
                    final ChannelFuture channel;
                    // 绑定address,port 获取 channel
                    if (restBindAddress == null) 
                        channel = bootstrap.bind(chosenPort);
                     else 
                        channel = bootstrap.bind(restBindAddress, chosenPort);
                    
                    serverChannel = channel.syncUninterruptibly().channel();
                    break;
                 catch (final Exception e) 
                    // syncUninterruptibly() throws checked exceptions via Unsafe
                    // continue if the exception is due to the port being in use, fail early
                    // otherwise
                    if (!(e instanceof java.net.BindException)) 
                        throw e;
                    
                
            

            if (serverChannel == null) 
                throw new BindException(
                        "Could not start rest endpoint on any port in port range "
                                + restBindPortRange);
            

            log.debug("Binding rest endpoint to :.", restBindAddress, chosenPort);

            final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
            final String advertisedAddress;
            if (bindAddress.getAddress().isAnyLocalAddress()) 
                advertisedAddress = this.restAddress;
             else 
                advertisedAddress = bindAddress.getAddress().getHostAddress();
            

            port = bindAddress.getPort();

            log.info("Rest endpoint listening at :", advertisedAddress, port);

            restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();

3.3.4、restAddress 启动成功

restAddressFuture.complete(restBaseUrl);

3.3.5、修改 EndPoint 状态为 RUNNING, 到这里 WebMonitorEndpoint 的 Netty 服务就启动完毕了

state = State.RUNNING;

3.4、钩子来启动子类特定的服务。


    /**
     * Hook to start sub class specific services.
     *
     * @throws Exception if an error occurred
     */
    protected abstract void startInternal() throws Exception;

我们看下子类 startInternal 的 具体实现
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#startInternal

    @Override
    public void startInternal() throws Exception 
    	// 1、 节点选举
        leaderElectionService.start(this);
        startExecutionGraphCacheCleanupTask();

        if (hasWebUI) 
            log.info("Web frontend listening at .", getRestBaseUrl());
        
    

3.4.1、 节点选举

HighAvailabilityServices 初始化, 根据 high-availability 的类型创建不同的 HighAvailabilityServices

leaderElectionService 初始化是在 WebMonitorEndpoint 创建时构建的

highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),

3.4.1.1、以 standalone 模式启动为例 创建的 StandaloneLeaderElectionService 对象

    @Override
    public LeaderElectionService getClusterRestEndpointLeaderElectionService() 
        synchronized (lock) 
            checkNotShutdown();

            return new StandaloneLeaderElectionService();
        
    

节点选举, 直接将 contender 设置为领导者, 此处的 contender 就是 WebMonitorEndpoint

    @Override
    public void start(LeaderContender newContender) throws Exception 
        if (contender != null) 
            // Service was already started
            throw new IllegalArgumentException(
                    "Leader election service cannot be started multiple times.");
        

        contender = Preconditions.checkNotNull(newContender);

        // directly grant leadership to the given contender
        contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
    

3.4.1.2、以 zookeeper HA 模式启动为例 创建的 DefaultLeaderElectionService对象

org.apache.flink.runtime.highavailability.AbstractHaServices#getClusterRestEndpointLeaderElectionService

    @Override
    public LeaderElectionService getClusterRestEndpointLeaderElectionService() 
       // 由子类实现 创建 选举leader服务
        return createLeaderElectionService(getLeaderPathForRestServer());
    

子类实现

    //org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices#createLeaderElectionService
    @Override
    protected LeaderElectionService createLeaderElectionService(String leaderPath) 
        return ZooKeeperUtils.createLeaderElectionService(getCuratorFramework(), leaderPath);
    

	// 创建 DefaultLeaderElectionService
	// org.apache.flink.runtime.util.ZooKeeperUtils#createLeaderElectionService
    public static DefaultLeaderElectionService createLeaderElectionService(
            final CuratorFramework client, final String path) 
        return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path));
    

DefaultLeaderElectionService 启动节点选举, 此处传入的 contender 就是 WebMonitorEndpoint

Flink的选举使用的是Curator框架,节点的选举针对每一个参选对象,会创建一个选举驱动leaderElectionDriver,在完成选举之后,会回调两个方法,如果选举成功会回调isLeader方法,如果竞选失败则回调notLeader方法。

// org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#start

    @Override
    public final void start(LeaderContender contender) throws Exception 
        checkNotNull(contender, "Contender must not be null.");
        Preconditions.checkState(leaderContender == null, "Contender was already set.");

        synchronized (lock) 
            running = true;
             /*
             在WebMonitorEndpoint中调用时,此contender为DispatcherRestEndPoint
              在ResourceManager中调用时,contender为ResourceManager
              在DispatcherRunner中调用时,contender为DispatcherRunner
             */
            leaderContender = contender;
            
            // 针对每一个参选对象,会创建一个选举驱动leaderElectionDriver
            leaderElectionDriver =
                    leaderElectionDriverFactory.createLeaderElectionDriver(
                            this, 
                            new LeaderElectionFatalErrorHandler(),
                            leaderContender.getDescription());
            LOG.info("Starting DefaultLeaderElectionService with .", leaderElectionDriver);
        
    

Flink的选举使用的是Curator框架,节点的选举针对每一个参选对象,会创建一个选举驱动leaderElectionDriver,在完成选举之后,会回调两个方法,如果选举成功会回调isLeader方法,如果竞选失败则回调notLeader方法。

ZooKeeperLeaderElectionDriverFactory 创建 ZooKeeperLeaderElectionDriver, LeaderElectionDriver负责执行领导选举和存储
领导信息。

// org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverFactory#createLeaderElectionDriver
    @Override
    public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
            LeaderElectionEventHandler leaderEventHandler,   // DefaultLeaderElectionService对象
            FatalErrorHandler fatalErrorHandler,	//  new LeaderElectionFatalErrorHandler()
            String leaderContenderDescription)
            throws Exception 
        return new ZooKeeperLeaderElectionDriver(
                client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);
    
    
    public ZooKeeperLeaderElectionDriver(
            CuratorFramework client,
            String path,
            LeaderElectionEventHandler leaderElectionEventHandler, // 传入的是 DefaultLeaderElectionService
            FatalErrorHandler fatalErrorHandler,
            String leaderContenderDescription)
            throws Exception 
        checkNotNull(path);
        this.client = checkNotNull(client);
        this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path);
        this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
        this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
        this.leaderContenderDescription = checkNotNull(leaderContenderDescription);

        leaderLatchPath = ZooKeeperUtils.generateLeaderLatchPath(path);
        leaderLatch = new LeaderLatch(client, leaderLatchPath);
        this.cache =
                ZooKeeperUtils.createTreeCache(
                        client以上是关于Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动的主要内容,如果未能解决你的问题,请参考以下文章

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析

Flink1.15源码解析--启动JobManager----WebMonitorEndpoint启动