ES源码分析Transport模块的初始化

Posted 顧棟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ES源码分析Transport模块的初始化相关的知识,希望对你有一定的参考价值。

文章目录

Transport模块的初始化

源码基于6.7.2

传输模块的初始化主要的在节点启动时的构造函数中完成的。

节点启动时,主要在构建函数中,进行通信模块的初始化。

    protected Node(
            final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) 
        logger = LogManager.getLogger(Node.class);
        final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
        boolean success = false;
        try 
            ...
            ...
            ...
            // 过滤出ActionPlugin插件列表作为参数传入 ActionModule构造函数,在ActionModule中会对TCP和HTTP的请求和处理类进行注册和绑定。
            ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),    settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);
            ...

            final RestController restController = actionModule.getRestController();
            // 过滤出NetworkPlugin插件列表作为参数传入 NetworkModule构造函数
            final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
                threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
                networkService, restController);
            ...
            ...
            ...  
            // 通过网络模块获取已经初始化的Transport
            final Transport transport = networkModule.getTransportSupplier().get();
            Set<String> taskHeaders = Stream.concat(
                pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
                Stream.of(Task.X_OPAQUE_ID)
            ).collect(Collectors.toSet());
            // 基于网络模块的Transport构建TransportService
            final TransportService transportService = newTransportService(settings, transport, threadPool,
                networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
            final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
            // 基于TransportService构建searchTransportService服务
            final SearchTransportService searchTransportService =  new SearchTransportService(transportService,
                SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
            final Consumer<Binder> httpBind;
            final HttpServerTransport httpServerTransport;
            // 通过网络模块获取已经初始化的Transport,HTTP还可以关闭?
            if (networkModule.isHttpEnabled()) 
                httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
                httpBind = b -> 
                    b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
                ;
             else 
                httpBind = b -> 
                    b.bind(HttpServerTransport.class).toProvider(Providers.of(null));
                ;
                httpServerTransport = null;
            
            ...
            ...
            ...    
            if (NetworkModule.HTTP_ENABLED.get(settings)) 
                logger.debug("initializing HTTP handlers ...");
                // 初始化REST的请求和处理类的映射
                actionModule.initRestHandlers(() -> clusterService.state().nodes());
            
            logger.info("initialized");

            success = true;
            ...
            ...
            ...  
    

ActionModule的初始化

ActionModule的内部初始化是通过插件的方式加载的,主要完成注册Action与处理类的映射和RestController的创建。

    public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
                        IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
                        ThreadPool threadPool, List<ActionPlugin> actionPlugins, NodeClient nodeClient,
            CircuitBreakerService circuitBreakerService, UsageService usageService) 
        this.transportClient = transportClient;
        this.settings = settings;
        this.indexNameExpressionResolver = indexNameExpressionResolver;
        this.indexScopedSettings = indexScopedSettings;
        this.clusterSettings = clusterSettings;
        this.settingsFilter = settingsFilter;
        this.actionPlugins = actionPlugins;
        // 进行Action设置,action注册和对应handler的映射绑定
        actions = setupActions(actionPlugins);
        // 对action过滤器进行配置
        actionFilters = setupActionFilters(actionPlugins);
        autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver);
        destructiveOperations = new DestructiveOperations(settings, clusterSettings);
        Set<String> headers = Stream.concat(
            actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
            Stream.of(Task.X_OPAQUE_ID)
        ).collect(Collectors.toSet());
        // rest包装器
        UnaryOperator<RestHandler> restWrapper = null;
        for (ActionPlugin plugin : actionPlugins) 
            UnaryOperator<RestHandler> newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext());
            if (newRestWrapper != null) 
                logger.debug("Using REST wrapper from plugin " + plugin.getClass().getName());
                if (restWrapper != null) 
                    throw new IllegalArgumentException("Cannot have more than one plugin implementing a REST wrapper");
                
                restWrapper = newRestWrapper;
            
        
        mappingRequestValidators = new TransportPutMappingAction.RequestValidators(
            actionPlugins.stream().flatMap(p -> p.mappingRequestValidators().stream()).collect(Collectors.toList())
        );

        if (transportClient) 
            restController = null;
         else 
            // 构建出RestController对象
            restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);
        
    

NetworkModule的初始化

构造NetworkModule对象,在执行构造函数的时候,通过插件的方式加载这3个成员对象。

主要数据成员对象

Map<String, Supplier<Transport>> transportFactories
Map<String, Supplier<HttpServerTransport>> transportHttpFactories
List<TransportInterceptor> transportIntercetors

Transport :负责内部节点的RPC请求

HttpServerTransport:负责客户端的REST服务

TransportInterceptor:传输层拦截器

    public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,
                         BigArrays bigArrays,
                         PageCacheRecycler pageCacheRecycler,
                         CircuitBreakerService circuitBreakerService,
                         NamedWriteableRegistry namedWriteableRegistry,
                         NamedXContentRegistry xContentRegistry,
                         NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) 
        this.settings = settings;
        this.transportClient = transportClient;
        // 遍历插件,分别注册HttpTransport,Transport,TransportInterceptor
        for (NetworkPlugin plugin : plugins) 
            if (transportClient == false && HTTP_ENABLED.get(settings)) 
                Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,
                    circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher);
                for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) 
                    // 其实检查不能使用TransportClient创建HTTP的通信传输,不能有同名的HttpServerTransport
                    registerHttpTransport(entry.getKey(), entry.getValue());
                
            
            Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler,
                circuitBreakerService, namedWriteableRegistry, networkService);
            for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) 
                // 检查不有同名的Transport
                registerTransport(entry.getKey(), entry.getValue());
            
            List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry,
                threadPool.getThreadContext());
            for (TransportInterceptor interceptor : transportInterceptors) 
                // 注册传输层拦截器
                registerTransportInterceptor(interceptor);
            
        
    

在构造完成时候,通过getTransportSuppliergetHttpServerTransportSuppliergetTransportInterceptor对外提供服务。

NetworkPlugin

NetworkPlugin是一个接口,Netty4Plugin实现了它,同时继承了Plugin。

在Netty4Plugin中实现了NetworkPlugin的getTransports和getHttpTransports方法,分别构建了Netty4Transport和Netty4HttpServerTransport用于Transport(TCP)传输和HTTP传输。

Netty4Transport

通过类图,可以发现Netty4Transport继承了TcpTransport,TcpTransport实现了Transport接口,这应该是在实现传输层,控制数据的在传输层的交互。Netty4Transport中实现了doStart的抽象方法,用来启动TCP服务。在启动的时候,默认情况下,同时构建了Client端和Server端。主要借助netty4框架来实现这些功能。

    @Override
    protected void doStart() 
        boolean success = false;
        try 
            ThreadFactory threadFactory = daemonThreadFactory(settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX);
            eventLoopGroup = new NioEventLoopGroup(workerCount, threadFactory);
            // 初始化客户端
            clientBootstrap = createClientBootstrap(eventLoopGroup);
            // 默认是开启服务端配置的,初始化Server端
            if (NetworkService.NETWORK_SERVER.get(settings)) 
                for (ProfileSettings profileSettings : profileSettings) 
                    createServerBootstrap(profileSettings, eventLoopGroup);
                    bindServer(profileSettings);
                
            
            super.doStart();
            success = true;
         finally 
            if (success == false) 
                doStop();
            
        
    

Netty4HttpServerTransport

根据类图发现 Netty4HttpServerTransport 同时继承了AbstractLifecycleComponent和实现了HttpServerTransport。

同样实现了doStart的抽象方法,用来启动HTTP Server服务。在HTTP Server服务中配置了监听端口和处理器。这里其实应该是通过Netty4来完成HTTP协议下的传输层的部分。

@Override
    protected void doStart() 
       ...
            // 配置了请求的处理类HttpChannelHandler
            serverBootstrap.childHandler(configureServerChannelHandler());
       ...
           // 绑定端口作为HTTP监听端口
            this.boundAddress = createBoundHttpAddress();
       ...
    

configureServerChannelHandler方法中构建了一个HttpChannelHandler对象,HttpChannelHandler的构造函数中有两个成员变量Netty4HttpServerTransportNetty4HttpRequestHandler。当收到请求的时候,会调用dispatchRequest对不同的请求执行相应的处理。dispatchRequest是接口HttpServerTransport类的内部接口Dispatcher中的方法。主要用来转发请求。它的主要实现类是RestController

TransportService的初始化

在Node的启动过程中会在初始化Transport之后,基于Transport构建TransportService。newTransportService方法中其实是调用TransportService的构造函数。

    public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) 
        this(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders,
             // 创建连接管理器
            new ConnectionManager(settings, transport));
    

ConnectionManager是Node传输连接的管理类,通过这个类的connectionManager.connectToNode(node, connectionProfile, connectionValidator(node));解析连接配置文件创建内部连接。在ConnectionManager的构造函数中调用ConnectionProfile.buildDefaultConnectionProfile(settings),这个方法会根据配置文件提供的信息创建连接。在TransportRequestOptions.Type中发现连接的分类。

public class TransportRequestOptions 
  ...
  ...
  ...
  public enum Type 
        RECOVERY, // 用于恢复
        BULK,     // 用于批量写入
        REG,      // 不是很清楚,有一个是集群注册
        STATE,    // 传输集群的状态
        PING      // ping请求
    
    

TransportSettings类中发现各类TCP连接的连接数,默认合计13个。

public final class TransportSettings 
        ...
        ...
        ...        
    public static final Setting<Integer> CONNECTIONS_PER_NODE_RECOVERY =
        intSetting("transport.connections_per_node.recovery", 2, 1, Setting.Property.NodeScope);
    public static final Setting<Integer> CONNECTIONS_PER_NODE_BULK =
        intSetting("transport.connections_per_node.bulk", 3, 1, Setting.Property.NodeScope);
    public static final Setting<Integer> CONNECTIONS_PER_NODE_REG =
        intSetting("transport.connections_per_node.reg", 6, 1, Setting.Property.NodeScope);
    public static final Setting<Integer> CONNECTIONS_PER_NODE_STATE =
        intSetting("transport.connections_per_node.state", 1, 1, Setting.Property.NodeScope);
    public static final Setting<Integer> CONNECTIONS_PER_NODE_PING =
        intSetting("transport.connections_per_node.ping", 1, 1, Setting.Property.NodeScope);
        ...
        ...
        ...
    
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
                        Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
                        Set<String> taskHeaders, ConnectionManager connectionManager) 
    ElasticSearchEs 源码之 Transport 和 TransportService 源码解读

《Elasticsearch 源码解析与优化实战》第15章:Transport模块分析

《Elasticsearch 源码解析与优化实战》第15章:Transport模块分析

ES transport client使用

elasticSearch6源码分析http和transport模块

《Elasticsearch 源码解析与优化实战》第16章:ThreadPool模块分析