网络模块 transport

Posted 0x13

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了网络模块 transport相关的知识,希望对你有一定的参考价值。

【模块说明】 transport-netty4 modules/transport-netty4: 这个就是网络模块包,需要通过 gradle 打包之后将整个模块包生成的 jar 包复制到数据目录下,通过 PluginsService.getModuleBundles() 进行加载。模块的功能是提供基础的 TCP、HTTP 服务,例如通信等等。具体通信消息的业务逻辑处理不在这个包内。模块目录结构如下:
处理HTTP协议编解码: org\\elasticsearch\\http\\netty4\\Netty4HttpChannel.java            netty 通道对象 Channel 的包装类 org\\elasticsearch\\http\\netty4\\Netty4HttpPipeliningHandler.java       org\\elasticsearch\\http\\netty4\\Netty4HttpRequest.java        封装的 HttpRequest 对象 org\\elasticsearch\\http\\netty4\\Netty4HttpRequestCreator.java org\\elasticsearch\\http\\netty4\\Netty4HttpRequestHandler.java    netty 处理 http 请求的 pipline 处理类 org\\elasticsearch\\http\\netty4\\Netty4HttpResponse.java org\\elasticsearch\\http\\netty4\\Netty4HttpResponseCreator.java org\\elasticsearch\\http\\netty4\\Netty4HttpServerChannel.java org\\elasticsearch\\http\\netty4\\Netty4HttpServerTransport.java        核心的 HTTP 服务。
处理TCP协议编解码,基础节点通信: org\\elasticsearch\\transport\\CopyBytesServerSocketChannel.java org\\elasticsearch\\transport\\CopyBytesSocketChannel.java org\\elasticsearch\\transport\\netty4\\ESLoggingHandler.java org\\elasticsearch\\transport\\netty4\\Netty4MessageChannelHandler.java        封装的 netty pipline 处理类 org\\elasticsearch\\transport\\netty4\\Netty4TcpChannel.java                netty 通道对象 Channel 的包装类,client 端。 org\\elasticsearch\\transport\\netty4\\Netty4TcpServerChannel.java        netty 通道对象 Channel 的包装类,server 端。 org\\elasticsearch\\transport\\netty4\\Netty4Transport.java                核心的 TCP 服务,节点之间创建链接。 org\\elasticsearch\\transport\\netty4\\Netty4Utils.java org\\elasticsearch\\transport\\Netty4NiosocketChannel.java
公共处理: org\\elasticsearch\\transport\\Netty4Plugin.java              网络模块插件的加载入口。 org\\elasticsearch\\transport\\NettyAllocator.java org\\elasticsearch\\transport\\NettyByteBufSizer.java org\\elasticsearch\\transport\\SharedGroupFactory.java
模块打包: 使用 elasticsearch(root)/modules/transport-netty4/Tasks/build/jar    生成jar 使用 elasticsearch(root)/modules/transport-netty4/Tasks/build/clean    清除jar 生成的依赖jar位置: modules/trnsport-netty4/distributins/transport-netty4-client-7.14.0-SNAPSHOT.jar 替换掉es数据目录下面对应模块的 jar 即可: 在发行版es数据 modules 目录中一共有下列几个地方用到这个依赖。都需要进行替换: esDate/modules/x-pack-core/ transport-netty4-client-7.14.0.jar esDate/modules/transport-netty4/ transport-netty4-client-7.14.0.jar 【模块说明】 x-pack-security 在发行版 es 数据目录下 modules/x-pack-security 的模块,里面有个依赖  x-pack-security-7.14.0.jar,里面包含的几个的个重要的和网络模块相关的类: SecurityNetty4ServerTransport    =>    默认处理集群节点之间rpc请求实现类。 SecurityNetty4HttpServerTransport    =>    默认处理集群接受外部用户rest请求实现类。 对应的源码类 x-pack/plugin/core/src/main/java/  路径下: org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4ServerTransport org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4HttpServerTransport 打包使用 gradle 的 task: elasticsearch(root)/x-pack/plugin/security/Tasks/build/jar 打包生成的 x-pack-security-7.14.0-SNAPSHOT.jar 生成的位置: elasticsearch-7.14.0/x-pack/plugin/security/build/distributions/ x-pack-security-7.14.0-SNAPSHOT.jar 
【主要的类】 NetworkModule: 网络模块 NetworkModule 加载处理集群网络相关的逻辑的实现类的入口。该模块针对 TCP、HTTP 协议进行了封装,封装了 TCP 用于集群保持长连接通信,封装了  HTTP 用于处理各种外部 Rest 请求。如果需要在网络模块中添加自己的网络服务,也是在这里添加。该类有三个集合来保存服务类: (1)Map<String, Supplier<Transport>> transportFactories = new HashMap<>();//保存 Transport 模块,调用 NetworkModule.registerTransport()。Transport 负责集群节点之间的 rpc 请求。 (2)Map<String, Supplier<HttpServerTransport>> transportHttpFactories = new HashMap<>();//保存 Http 模块,调用 NetworkModule.registerHttpTransport()。HttpServerTransport 负责集群接收用户http请求。 (3)List<TransportInterceptor> transportInterceptors = new ArrayList<>();//保存拦截器,调用 NetworkModule.registerTransportInterceptor()。 同时调用下列方法,从集合中获取 TCP、HTTP的处理类: (1)NetworkModule.getHttpServerTransportSupplier()。 (2)NetworkModule.getTransportSupplier()。 NetworkPlugin: 网络插件,网络模块初始化时加载这些插件,这些插件提供网络服务。默认配置下有四种网络插件 XPackPlugin、Netty4Plugin、Security、VotingOnlyNodePlugin。其中 Netty4Plugin 插件是我们关注的封装了  TCP 和 HTTP 服务实现类。这些服务实现类会被添加到 NetworkModule 的三个集合中。 (1)添加 HttpServerTransport 到集合 transportHttpFactories 中 (2)添加 Transport 到集合 transportFactories 中 (3)添加 TransportInterceptor 到集合 transportInterceptors 中 Netty4Plugin: 网络插件之一,继承自 NetworkPlugin() 接口,其中主要是下列两个接口用于构造网络实现类: (1)getTransports()://返回Map<String, Supplier<Transport>>,然后添加到 transportFactories 集合中。接口返回 Netty4Transport ,以加载 TCP 服务到容器中。key=netty4。 (2)getHttpTransports(); //Map<String, Supplier<HttpServerTransport>>,然后添加到 transportHttpFactories 集合中。接口返回 Netty4HttpServerTransport,以记载 HTTP 服务到容器中。key=netty4。 Security: 网络插件之一,继承自 NetworkPlugin() 接口,其中主要是下列两个接口用于构造网络实现类: (1)getTransports()://返回Map<String, Supplier<Transport>>,然后添加到 transportFactories 集合中。接口返回 SecurityNetty4ServerTransport、SecurityNioTransport 加载 TCP 服务到容器中。key=security4、security-nio。 (2)getHttpTransports(); //Map<String, Supplier<HttpServerTransport>>,然后添加到 transportHttpFactories 集合中。接口返回 SecurityNetty4HttpServerTransport、SecurityNioHttpServerTransport,以记载 HTTP 服务到容器中。key=security4、security-nio。 HTTP服务和TCP服务:
Netty4HttpServerTransport Netty4Transport
封装了 http 的处理器。由 Netty4Plugin.getHttpTransports() 创建。http编解码和处理。 集群节点通信的 tcp 处理器。由 Netty4Plugin.getTransports() 创建。tcp编解码和处理。
ch.pipeline().addLast("byte_buf_sizer", NettyByteBufSizer.INSTANCE); ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler()); ch.pipeline().addLast("decoder",HttpRequestDecoder); ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor()); ch.pipeline().addLast("encoder", new HttpResponseEncoder()); ch.pipeline().addLast("encoder", newHttpObjectAggregator()); ch.pipeline().addLast("encoder_compress", new HttpContentCompressor()); ch.pipeline().addLast("request_creator", requestCreator); ch.pipeline().addLast("response_creator", responseCreator); ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler()); ch.pipeline().addLast("response_creator", new Netty4HttpRequestHandler()); ch.pipeline().addLast("byte_buf_sizer", NettyByteBufSizer.INSTANCE); ch.pipeline().addLast("logging", new ESLoggingHandler()); ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler());
SecurityNetty4HttpServerTransport:是 Netty4HttpServerTransport 的子类。 SecurityNetty4ServerTransport:是 Netty4Transport 的子类。 SecurityNioTransport:是 NioTransport 子类。 SecurityNioHttpServerTransport:是 NioHttpServerTransport 子类。 TcpTransport: 集群通信TCP客户端处理器,会调用 openConnection() 建立连接,也就是调用前面的 Netty4Transport.initiateChannel() 定义的各个TCP处理器用于集群通信。这里和 NetworkModule 的区别在于,NetworkModule 是定义并加载 Netty4Transport 服务到容器中,而 TcpTransport 是集群启动过程中为了节点之间建立连接来使用前面定义的 Netty4Transport  服务。 内部内 TcpTransport.NodeChannels.sendRequest() 方法是集群所有节点之间TCP消息的发送入口。 Netty4MessageChannelHandler: 集群通信TCP客户端最终的业务消息处理器。在 TcpTransport 建立了节点之间的连接后,节点之间通信的原始二进制数据由 Netty4MessageChannelHandler 来解析和处理。这里就进入了我们熟悉的 Netty 编程。关注 channelRead() 等接口了。 从网络读取的消息会调用 InboundPipeline() 进行处理。也就是 RPC 消息的类型区分接口。 RestController: 该类用于 http 的请求路由,继承自 HttpServerTransport.Dispatcher 接口,在创建 Netty4HttpServerTransport() 时传入,保存到 dispatcher 属性中。 RestController.dispatchRequest() 是所有节点 http 请求的入口。 TransportService: 通信服务,使用上面的各个通信子类来定义通信服务。 ClusterConnectionManager: 管理集群之间的链接。 TcpChannel: 节点之间创建连接的 Channel 的包装类。有多个子类,Netty4TcpChannel。最终调用底层 write() 以发送消息出去。 OutboundHandler 和 InboundPipeline: 前者是 Netty 消息发送的工具,后者是 Netty 消息接受的工具。 Transport.RequestHandlers requestHandlers;    处理其他节点发送过来的集群rpc请求。这个类维护了一个   Map<String, RequestHandlerRegistry<? extends TransportRequest>> requestHandlers Transport.ResponseHandlers responseHandlers;    处理其他节点发送过来的集群rpc响应。这个类维护了一个  ConcurrentMapLong<ResponseContext<? extends TransportResponse>> handlers InboundHandler: 这个是节点接收到其他节点TCP数据的处理入口,判断是请求还是响应。如果是响应则调用 handleResponse() 处理。如果是请求则调用 handleRequest() 处理。 ActionModule: 处理 action 操作的处理类注册,有两个方法比较重要。 (1)initRestHandlers() 会将所有  BaseRestHandler() 子类进行注册,用于处理前端请求。 (2)setupActions() 会将所有 action 具体的请求处理类进行注册。
【网络模块初始化】 网络模块的初始化: 该模块同样是 Node 节点对象初始化时执行,加载模块时进行初始化。可以看到注册的 HttpTransport、Transport 根据插件实现类 NetworkPlugin 有不同的实现类: key=netty4 会注册  Netty4HttpServerTransport、Netty4Transport。 key=security4 会注册 SecurityNetty4HttpServerTransport、SecurityNetty4ServerTransport。 key=security-nio 会注册  SecurityNioTransport、SecurityNioHttpServerTransport。 【log】Bootstrap.init() 系统初始化入口   【log】new NetworkModule() 网络模块创建 【log】new Node() 创建当前节点的Node对象,以及 modules 注入 【log】new NetworkModule() 网络模块创建,isTransportClient=false,networkPlugin=4 【log】new NetworkModule() 插件实现类 NetworkPlugin=class org.elasticsearch.transport.Netty4Plugin 【log】Netty4Plugin.getHttpTransports() 返回 Netty4HttpServerTransport 对象,dispatcher=class org.elasticsearch.rest.RestController 【log】NetworkModule.registerHttpTransport() key=netty4 【log】Netty4Plugin.getTransports() 返回 Netty4Transport 对象 【log】NetworkModule.registerTransport() key=netty4 【log】new NetworkModule() 插件实现类 NetworkPlugin=class org.elasticsearch.xpack.security.Security 【log】NetworkModule.registerHttpTransport() key=security4 【log】NetworkModule.registerHttpTransport() key=security-nio 【log】NetworkModule.registerTransport() key=security4 【log】NetworkModule.registerTransport() key=security-nio 【log】NetworkModule.registerTransportInterceptor() 初始化 new NetworkModule() 时会加载各个 NetworkPlugin(),代码如下:
public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,
                     BigArrays bigArrays,
                     PageCacheRecycler pageCacheRecycler,
                     CircuitBreakerService circuitBreakerService,
                     NamedWriteableRegistry namedWriteableRegistry,
                     NamedXContentRegistry xContentRegistry,
                     NetworkService networkService, HttpServerTransport.Dispatcher dispatcher,
                     ClusterSettings clusterSettings) 
    this.settings = settings;
    this.transportClient = transportClient;
    // 便利网络插件,也就是网络实现类
    for (NetworkPlugin plugin : plugins) 
        // 添加 HttpServerTransport 到集合 transportHttpFactories 中
        Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher, clusterSettings);
        if (transportClient == false) 
            for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) 
                registerHttpTransport(entry.getKey(), entry.getValue());
            
        
        // 添加 Transport 到集合 transportFactories 中
        Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
        for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) 
            registerTransport(entry.getKey(), entry.getValue());
        
        // 添加 TransportInterceptor 到集合 transportInterceptors 中
        List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry, threadPool.getThreadContext());
        for (TransportInterceptor interceptor : transportInterceptors) 
            registerTransportInterceptor(interceptor);
        
    

遍历每个  NetworkPlugin 子类,然后调用 getHttpTransports() 、 getTransports()、getTransportInterceptors() 获取插件定义的网络服务类,重点是 Netty4Plugin、Security 两个插件如下: Netty4Plugin 代码,实际上是初始化了 Netty4HttpServerTransport、Netty4Transport 两个对象,初始化后加入集合: @Override
public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
                                                                    PageCacheRecycler pageCacheRecycler,
                                                                    CircuitBreakerService circuitBreakerService,
                                                                    NamedXContentRegistry xContentRegistry,
                                                                    NetworkService networkService,
                                                                    HttpServerTransport.Dispatcher dispatcher,
                                                                    ClusterSettings clusterSettings) 
    return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
        () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher,
            clusterSettings, getSharedGroupFactory(settings)));

@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
                                                      CircuitBreakerService circuitBreakerService,
                                                      NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) 
    return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
        networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, getSharedGroupFactory(settings)));

在 Node 初始化加载 modules 过后,会调用下列两个方法从两个集合中取出 TCP 处理类、HTTP 处理类,按照key进行获取: (1)NetworkModule.getHttpServerTransportSupplier()。默认按照 key=security4,获取  SecurityNetty4HttpServerTransport。如果配置按照 key=netty4 则会获取 Netty4HttpServerTransport (2)NetworkModule.getTransportSupplier()。默认按照 key=security4,获取  SecurityNetty4ServerTransport。如果配置按照 key=netty4 则会获取Netty4Transport。 因为 SecurityNetty4ServerTransport 继承自 Netty4Transport,构造函数调用了  super(),会一起初始化父对象和子对象。 调用流程如下: 【log】new Node() 时调用 networkModule.getTransportSupplier().get() 获取 transport=class org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4ServerTransport 【log】NetworkModule.getTransportSupplier() 默认配置 name=security4 【log】Node.newHttpTransport() 从 NetworkModule.transportHttpFactories 集合中获取 HttpServerTransport=class org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4HttpServerTransport 【log】NetworkModule.getHttpServerTransportSupplier() 默认配置 name=security4 注意在获取服务时会判断默认配置,也就是key: 如果开启了xpack。默认配置是 security4,所有服务是使用的 security4缓存。 如果未开启xpaxk。默认配置是 netty4,所有服务是使用的 netty4缓存。 配置如下: xpack.security.enabled: false 配置判断的代码在  NetworkModule.getHttpServerTransportSupplier()、NetworkModule.getTransportSupplier() 如下:
public Supplier<HttpServerTransport> getHttpServerTransportSupplier() 
    final String name;
    if (HTTP_TYPE_SETTING.exists(settings)) 
        name = HTTP_TYPE_SETTING.get(settings);
     else 
        name = HTTP_DEFAULT_TYPE_SETTING.get(settings);
    
    final Supplier<HttpServerTransport> factory = transportHttpFactories.get(name);
    if (factory == null) 
        throw new IllegalStateException("Unsupported http.type [" + name + "]");
    
    return factory;


【集群RPC】请求响应交互 根据网络模块和网络插件的实现,HTTP在底层的编解码服务实现类是  SecurityNetty4ServerTransport 及其父类 Netty4Transport。 在节点 Node 对象初始化过程中,前面获取到各个 Transport 对象后(SecurityNetty4ServerTransport、SecurityNetty4HttpServerTransport),就开始封装 TransportService 对象。 并初始化 ClusterConnectionManager 用于管理集群链接。后续会调用TcpTransport.initiateConnection() 初始化每个 node 的连接。过程如下: 【log】new ClusterConnectionManager() 初始化集群链接管理器 【log】new TransportService() 初始化集群通信服务 TransportService 对象封装好之后就会调用 doStart()启动,紧接着调用TCP服务并且绑定本地端口才能监听 socket 数据,流程如下 TransportService.doStart()     =>    集群通信服务启动入口 SecurityNetty4ServerTransport.doStart()    =>   同时调用父类的 doStart() Netty4Transport.doStart()    =>    启动网络服务,监听 9300 默认端口。 【log】TransportService.doStart() 启动 【log】SecurityNetty4ServerTransport.doStart() 启动 【log】Netty4Transport.doStart() 启动 【log】TransportService.java|doStart|271|main publish_address 172.16.8.191:9301, bound_addresses 172.16.8.191:9301 集群节点之间创建连接过程,在节点 Node 启动之后,在 Raft 选举流程中需要给其他节点发送 pre-vote 以决定大多数,实际上在 PeerFinder 类中,该类用于集群节点发现,也就是集群节点建立连接的入口: PeerFinder 会根据配置找到另外两个节点,调用 PeerFinder.startProbe(),也就是节点探查,去尝试连接其他节点,此时节点会建立一个连接,因为此时处于节点探查期,只能获取到目标 ipd:port,无法获取到节点 name: 【log】PeerFinder.startProbe() 开始节点探查 【log】TransportService.openConnection() 【log】PeerFinder.establishConnection() 节点发现,创建连接 【log】ClusterConnectionManager.openConnection() 【log】HandshakingTransportAddressConnector.connectToRemoteMasterNode() 【log】ClusterConnectionManager.internalOpenConnection() 初始化集群链接,transport=class org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4ServerTransport 【log】TransportService.openConnection() 【log】TcpTransport.openConnection() 打开节点链接,初始化 List<TcpChannel> ,当前实现类 this=class org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4ServerTransport 【log】ClusterConnectionManager.openConnection() 【log】TcpTransport.initiateConnection() 初始化连接,nodeId:172.16.8.191:9301,nodeName=,连接个数numConnections=1 【log】TcpTransport.initiateConnection() 初始化连接,nodeId:172.16.8.191:9302,nodeName=,连接个数numConnections=1 因为基于TCP的rpc通信,每个节点都会暴露默认的 9300端口,节点之间相互连接会形成网状结构,节点越多连接指数增长。每个节点只需要保证自己连接到了集群里面其他节点就行。在确定了 Master 之后会提交各种集群任务,此时会在节点建立更多的连接(默认13)不同连接处理不同的请求,节点之间默认全部连接全部成功才成功,一个连接异常会关闭所有连接,因为前面节点探查期间已经建立了连接了,此时获取到了节点名称,以 node-1 连接 node-2/node-3 为例: 【log】TcpTransport.initiateConnection() 初始化连接,nodeId:ypVpKa9lQmula90_RgrW8w,nodeName=node-2,连接个数numConnections=13 【log】TcpTransport.initiateConnection() 初始化连接,nodeId:i6sYVTpdQ6aCJf-C3cAuvQ,nodeName=node-3,连接个数numConnections=13 建立连接,就是调用前面的 SecurityNetty4ServerTransport 服务来初始化 channel,TcpTransport.initiateConnection() 代码如下:
private List<TcpChannel> initiateConnection(DiscoveryNode node, ConnectionProfile connectionProfile,ActionListener<Transport.Connection> listener) 
    int numConnections = connectionProfile.getNumConnections();
    assert numConnections > 0 : "A connection profile must be configured with at least one connection";
    final List<TcpChannel> channels = new ArrayList<>(numConnections);
    // 节点之间会建立默认 numConnections=1或者13 个连接,每个连接不同用途,这里初始化 channel
    for (int i = 0; i < numConnections; ++i) 
        try 
            TcpChannel channel = initiateChannel(node);
            logger.trace(() -> new ParameterizedMessage("Tcp transport client channel opened: ", channel));
            channels.add(channel);
         catch (ConnectTransportException e) 
            CloseableChannel.closeChannels(channels, false);
            listener.onFailure(e);
            return channels;
         catch (Exception e) 
            CloseableChannel.closeChannels(channels, false);
            listener.onFailure(new ConnectTransportException(node, "general node connection failure", e));
            return channels;
        
    
    ChannelsConnectedListener channelsConnectedListener = new ChannelsConnectedListener(node, connectionProfile, channels,
        new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, listener, false));
    for (TcpChannel channel : channels) 
        channel.addConnectListener(channelsConnectedListener);
    
    TimeValue connectTimeout = connectionProfile.getConnectTimeout();
    threadPool.schedule(channelsConnectedListener::onTimeout, connectTimeout, ThreadPool.Names.GENERIC);
    return channels;

调用了静态方法 TcpTransport.initiateChannel(),默认的实现类是 SecurityNetty4ServerTransport(x-pack-security.jar)。 但是 SecurityNetty4ServerTransport 未实现 initiateChannel() 方法,所以最终调用的其父类方法 Netty4ServerTransport(transport-netty4-client.jar) 的 initiateChannel() 方法,其中会调用 new ClientChannelInitializer() 为每个连接绑定处理器 Handler。 Netty4Transport.initiateChannel() 如下:
@Override
protected Netty4TcpChannel initiateChannel(DiscoveryNode node) throws IOException 
    InetSocketAddress address = node.getAddress().address();
    Bootstrap bootstrapWithHandler = clientBootstrap.clone();
    // 初始化处理器 handler
    bootstrapWithHandler.handler(getClientChannelInitializer(node));
    bootstrapWithHandler.remoteAddress(address);
    ChannelFuture connectFuture = bootstrapWithHandler.connect();
    Channel channel = connectFuture.channel();
    if (channel == null) 
        ExceptionsHelper.maybeDieOnAnotherThread(connectFuture.cause());
        throw new IOException(connectFuture.cause());
    
    Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, false, "default", connectFuture);
    channel.attr(CHANNEL_KEY).set(nettyChannel);
    return nettyChannel;


绑定处理器handler部分,集群 TCP 的 rpc 消息编解码处理类是 Netty4MessageChannelHandler()。
Netty4Transport.ClientChannelInitializer():
protected class ClientChannelInitializer extends ChannelInitializer<Channel> 
    @Override
    protected void initChannel(Channel ch) throws Exception 
        addClosedExceptionLogger(ch);
        assert ch instanceof Netty4NioSocketChannel;
        NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
        ch.pipeline().addLast("byte_buf_sizer", NettyByteBufSizer.INSTANCE);
        ch.pipeline().addLast("logging", new ESLoggingHandler());
        // using a dot as a prefix means this cannot come from any settings parsed
        ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this));
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
        ExceptionsHelper.maybeDieOnAnotherThread(cause);
        super.exceptionCaught(ctx, cause);
    

初始化  Netty4MessageChannelHandler 时,会注入一个 TcpTransport::inboundMessage(),用于处理从  channelRead() 接口在网络中读取来的数据:
Netty4MessageChannelHandler(PageCacheRecycler recycler, Netty4Transport transport) 
    this.transport = transport;
    final ThreadPool threadPool = transport.getThreadPool();
    final Transport.RequestHandlers requestHandlers = transport.getRequestHandlers();
    // 创建 InboundPipeline
    this.pipeline = new InboundPipeline(
        transport.getVersion(),
        transport.getStatsTracker(),
        recycler,
        threadPool::relativeTimeInMillis,
        transport.getInflightBreaker(),
        requestHandlers::getHandler,
        transport::inboundMessage   // 处理 channelRead() 从网络读取的 bytes 数据
    );

对目标节点的13个连接全部创建 channels 之后,会在 TcpTransport.onResponse() 函数中创建 NodeChannels() 对象来保存目标节点、以及13个 channel: 可以发现连接类型包含 RECOVERY、BULK、REG、STATE、PING 等类型。也就是对应了共五种RPC请求。其中 NodeChannels 对象还提供了方法通过 type 获取缓存的 channel,以及最终的消息发送方法。 【log】TcpTransport.onResponse() 目标节点所有链接创建完毕 channels=13 【log】new NodeChannels() 保存目标节点的所有连接,连接类型: 【log】RECOVERY=org.elasticsearch.transport.ConnectionProfile$ConnectionTypeHandle@3e7171ae,  【log】BULK=org.elasticsearch.transport.ConnectionProfile$ConnectionTypeHandle@735945e7,  【log】REG=org.elasticsearch.transport.ConnectionProfile$ConnectionTypeHandle@44830da3,  【log】STATE=org.elasticsearch.transport.ConnectionProfile$ConnectionTypeHandle@5d68ebd0,  【log】PING=org.elasticsearch.transport.ConnectionProfile$ConnectionTypeHandle@68210dd2 消息类型分类: recovery:做数据恢复recovery,默认个数2个; bulk:用于bulk请求,默认个数3个; reg:典型的搜索和单doc索引,默认个数6个; state :如集群state的发送等,默认个数1个; ping:就是node之间的ping咯。默认个数1个; 其中缓存的连接类型是 TcpChannel,他是 Channel 包装类,实际上创建的子类型是 Netty4TcpChannel,在节点消息发送过程中会根据请求类型( RECOVERY、BULK、REG、STATE、PING)取出一个 Netty4TcpChannel,最终会调用 channel.write() 发送消息: TransportService.sendRequest()    =>    调用tcp连接 channel 进行消息发送,先调用 TranportService.getConnection() 判断是远端节点还是本地节点。 【log】TransportService.sendRequestInternal() 发送请求入口 【log】TcpTransport.NodeChannels.sendRequest() request=LeaderCheckRequestsender=node-1 【log】NodeChannels.channel() 根据请求类型获取缓存的channel,type=PING 【log】TcpTransport.NodeChannels.sendRequest() 消息类型=PING,获取得到TcpChannel子类=Netty4TcpChannel 【log】OutboundHandler.sendRequest() 向目标节点发送请求的入口 【log】OutboundHandler.internalSend() 调用 channel.sendMessage() 请求发送到其他节点后,其他节点会在 Netty4MessageChannelHandler.channelRead() 获取到消息,调用 InboundPipeline.handleBytes(),最终的处理如下: Netty4MessageChannelHandler.channelRead() 代码如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
    assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
    assert Transports.assertTransportThread();
    assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass();
    final ByteBuf buffer = (ByteBuf) msg;
    Netty4TcpChannel channel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();
    final BytesReference wrapped = Netty4Utils.toBytesReference(buffer);
    try (ReleasableBytesReference reference = new ReleasableBytesReference(wrapped, buffer::release)) 
        pipeline.handleBytes(channel, reference);
    

接下来 InboundPipeline.doHandleBytes() 会将请求进行 decoder.decode()  解码,解码成功后按照请求类型进行分发,解码后的请求会进入  InboundPipeline.forwardFragments() 接口。 InboundPipeline.forwardFragments() 接口如下:
private void forwardFragments(TcpChannel channel, ArrayList<Object> fragments) throws IOException 
    for (Object fragment : fragments) 
        // 如果是 Header 则调用 InboundAggregator aggregator 进行处理
        if (fragment instanceof Header) 
            assert aggregator.isAggregating() == false;
            aggregator.headerReceived((Header) fragment);
        
        // 如果时 PING 则调用 BiConsumer<TcpChannel, InboundMessage> messageHandler 进行处理
        // messageHandler 就是前面初始化注入的 TcpTransport::inboundMessage()
        else if (fragment == InboundDecoder.PING) 
            assert aggregator.isAggregating() == false;
            messageHandler.accept(channel, PING_MESSAGE);
        
        else if (fragment == InboundDecoder.END_CONTENT) 
            assert aggregator.isAggregating();
            try (InboundMessage aggregated = aggregator.finishAggregation()) 
                statsTracker.markMessageReceived();
                messageHandler.accept(channel, aggregated);
            
        
        else 
            assert aggregator.isAggregating();
            assert fragment instanceof ReleasableBytesReference;
            aggregator.aggregate((ReleasableBytesReference) fragment);
        
    

最终消息进入 InboundHandler.messageReceived() 中进行处理,当前节点收到另外两个节点的 leader_heck 请求: InboundHandler.messageReceived()    =>    处理其他节点来到的消息。这里判断是别人发送过来的请求还是响应 InboundHandler.handleRequest()    =>    如果当前消息是其他节点发送过来的请求,则从   requestHandlers.getHandler(action)  根据action 获取指定请求动作的处理器。 RequestHandlerRegistry.processMessageReceived()    =>    请求处理器的注册器,出去里面的请求处理器来处理请求 TransportRequestHandler.messageReceived()    =>    最终的请求处理器对请求进行处理,这里是接口,具体子类很多是匿名的。 【log】InboundHandler.handleRequest() 处理其他节点的请求,requestId=187,action=internal:coordination/fault_detection/leader_check,处理器reg=class org.elasticsearch.transport.RequestHandlerRegistry 【log】InboundHandler.handleRequest() 处理其他节点的请求,requestId=97,action=internal:coordination/fault_detection/leader_check,处理器reg=class org.elasticsearch.transport.RequestHandlerRegistry InboundHandler 的成员变量有两个比较重要的: Transport.RequestHandlers requestHandlers;    处理其他节点发送过来的请求。这个类维护了一个   Map<String, RequestHandlerRegistry<? extends TransportRequest>> requestHandlers Transport.ResponseHandlers responseHandlers;    处理其他节点发送过来的响应。这个类维护了一个  ConcurrentMapLong<ResponseContext<? extends TransportResponse>> handlers 这两个是处理器集合,区别在于: RequestHandlers 是处理其他节点发过来的请求,所以缓存的 key 是 action(操作),每个节点提前将不同的 action 的处理器缓存进去,在收到其他节点请求之后直接取出来执行。 ResponseHandlers 是处理其他节点发过来的响应,缓存的 key 是请求编号用于请求和响应对应起来。当前节点发送请求到其他节点后,将请求编号和响应处理器缓存起来等待其他节点返回响应过来,然后根据响应的编号从缓存中取出来执行。 RequestHandlers 注册请求处理器  TransportRequestHandler 过程: 这里注册进去的是 TransportRequestHandler 接口实现类,需要实现 messageReceived() 方法,很多实现类是匿名的,所以无法通过  reg.getHandler().toString() 来打印,但是可以打印 action,这些 action 实际上在代码里面是可以搜索到的。 Transport.RequestHandlers 注册请求处理器代码如下:
synchronized <Request extends TransportRequest> void registerHandler(RequestHandlerRegistry<Request> reg) 
    if (requestHandlers.containsKey(reg.getAction())) 
        throw new IllegalArgumentException("transport handlers for action " + reg.getAction() + " is already registered");
    
    requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();

public synchronized <Request extends TransportRequest> void forceRegister(RequestHandlerRegistry<Request> reg) 
    requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();

经过调试发现默认注册了 577 个请求处理器,分为三类 cluster、internal、indices: Transport.ResponseHandlers 注册请求处理器代码如下: 是处理其他节点发过来的响应,缓存的 key 是请求编号用于请求和响应对应起来。所以这个处理器不是提前缓存的,而是在每次发送请求之后将请求id和响应处理进行缓存,在发送请求之前,将响应和requestId进行缓存: TransportService.sendRequestInternal() 代码如下:
private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,
                                                               final TransportRequest request,
                                                               final TransportRequestOptions options,
                                                               TransportResponseHandler<T> handler) 
    // 省略
    // 将请求的响应处理函数以及请求id 进行缓存,用于后续监听到响应后取出来执行
    final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));
    // 省略
    try 
        // 省略
        // 这里发送请求
        connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
     catch (final Exception e) 
        // 省略
    

到此为止,请求的发送、请求回来响应的处理已经清楚了,还剩下最后一步,就是节点A发送请求到B,节点A缓存响应处理函数后,节点B对请求进行处理之后需要将响应回传给节点A,然后节点A再根据响应请求id取出响应处理器进行处理。 那么节点B在处理完请求后,如何将响应返回给节点A呢?先看节点B对节点A的请求处理: 以 action=internal:cluster/request_pre_vote 预投票请求为例,由 PreVoteController 在初始化时注册该请求的处理器,处理其他节点发送过来的预投票请求 也就是注册的 PreVoteController.handlePreVoteRequest() 函数来处理其他节点发送过来的请求,该函数返回 PreVoteResponse(TransportResponse子类)。 最后调用 请求节点的 Channel.sendResponse() 将请求处理过后的响应进行回传。实现的子类时 TaskTransportChannel.sendResponse() 。
PreVoteCollector(final TransportService transportService, final Runnable startElection, final LongConsumer updateMaxTermSeen,
                 final ElectionStrategy electionStrategy, NodeHealthService nodeHealthService) 
    this.transportService = transportService;
    this.startElection = startElection;
    this.updateMaxTermSeen = updateMaxTermSeen;
    this.electionStrategy = electionStrategy;
    this.nodeHealthService = nodeHealthService;
    // 注册 REQUEST_PRE_VOTE_ACTION_NAME 请求的处理
    transportService.registerRequestHandler(REQUEST_PRE_VOTE_ACTION_NAME, Names.GENERIC, false, false,
        PreVoteRequest::new,
        (request, channel, task) -> channel.sendResponse(handlePreVoteRequest(request)));

此时响应由节点B回传给节点A,那么响应又回到了节点A的 Netty4MessageChannelHandler.channelRead() 中了,同理和前面处理其他节点发送的请求一样,也是根据数据来判断是请求还是响应。按照前面的分析我们已经知道节点收到TCP数据的调用流程: InboundPipeline.doHandleBytes()    =>    会将请求进行 decoder.decode()  解码。 InboundPipeline.forwardFragments()    =>    按照数据类型进行分发。 InboundHandler.messageReceived()    =>    开始处理节点收到其他节点发送的TCP数据。 在最后一步中就会判断其他节点传过来的是请求还是响应,如果是响应则调用 InboundHandler.handleResponse()。并且通过 requestId 将缓存的响应处理器也查找了出来。 InboundHandler.messageReceived() 代码如下:
    private void messageReceived(TcpChannel channel, InboundMessage message, long startTime) throws IOException 
                // 省略
        ThreadContext threadContext = threadPool.getThreadContext();
        try (ThreadContext.StoredContext existing = threadContext.stashContext()) 
            // 其他节点发送的请求
            if (header.isRequest()) 
                handleRequest(channel, header, message);
            
            // 其他节点发送的响应
            else 
                assert message.isShortCircuit() == false;
                final TransportResponseHandler<?> handler;
                // 获取请求id,根据id从缓存查找响应处理器
                long requestId = header.getRequestId();
                if (header.isHandshake()) 
                    handler = handshaker.removeHandlerForHandshake(requestId);
                 else 
                    TransportResponseHandler<? extends TransportResponse> theHandler = responseHandlers.onResponseReceived(requestId, messageListener);
                    if (theHandler == null && header.isError()) 
                        handler = handshaker.removeHandlerForHandshake(requestId);
                     else 
                        handler = theHandler;
                    
                
                // 如果有响应处理器则调用
                if (handler != null) 
                    final StreamInput streamInput;
                    if (message.getContentLength() > 0 || header.getVersion().equals(Version.CURRENT) == false) 
                        // 省略
                        if (header.isError()) 
                            handlerResponseError(streamInput, handler);
                         else 
                            // 执行响应处理
                            handleResponse(remoteAddress, streamInput, handler);
                        
                     
                
            
         
    

因为前面已经将响应处理器(函数)、响应(参数)都拿到了后续只需要执行就行了。接下来就在 InboundHandler.handleResponse() 中完成的。调用的 InboundHandler.doHandleResponse() InboundHandler.handleResponse() 代码如下:
private <T extends TransportResponse> void doHandleResponse(TransportResponseHandler<T> handler, T response) 
    //处理其他节点传过来的响应数据
    try 
        handler.handleResponse(response);
     catch (Exception e) 
        handleException(handler, new ResponseHandlerFailureTransportException(e));
     finally 
        response.decRef();
    


【集群HTTP】请求响应处理 根据网络模块和网络插件的实现,HTTP在底层的编解码服务实现类是  SecurityNetty4HttpServerTransport 及其父类  Netty4HttpServerTransport。这两个类实现了 Netty 接口提供 HTTP 请求的编解码以及后端服务实现。和 TCP 服务同理,也是调用 doStart() 启动服务监听端口。 流程如下: TransportService.doStart() 启动    =>    集群通信服务服务启动入口 SecurityNetty4HttpServerTransport.doStart()    =>    启动服务监听9200端口,并调用父类的 doStart() Netty4HttpServerTransport.doStart()    =>    监听9200 端口。 【log】TransportService.doStart() 启动 【log】SecurityNetty4ServerTransport.doStart() 启动 【log】Netty4Transport.doStart() 启动 【log】AbstractHttpServerTransport.bindServer() address=/172.16.8.191 【log】AbstractHttpServerTransport.java|bindServer|203|main publish_address 172.16.8.191:9201, bound_addresses 172.16.8.191:9201 然后  Netty4HttpServerTransport.doStart()  在启动过程中会给 Netty 的流水线初始化一个  HttpChannelHandler()。 Netty4HttpServerTransport.doStart() 代码如下:
protected void doStart() 
    boolean success = false;
    try 
        sharedGroup = sharedGroupFactory.getHttpGroup();
        serverBootstrap = new ServerBootstrap();
        // 省略
        // 初始化 HttpChannelHandler
        serverBootstrap.childHandler(configureServerChannelHandler());
        serverBootstrap.handler(new ServerChannelExceptionHandler(this));
        // 省略
        // 绑定本地端口开启 HTTP 服务器
        bindServer();
        success = true;
     finally 
        // 省略
    

HttpChannelHandler() 就是继承了 ChannelInitializer 的流水线初始化工具,在  initChannel() 过程中会在流水线中加入一个  Netty4HttpRequestHandler,这个就是处理 socket 监听的 http 请求处理类。 HttpChannelHandler.initChannel() 代码如下:
protected void initChannel(Channel ch) throws Exception 
    // 注入 netty http 解析的流水线
    Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
    ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
    ch.pipeline().addLast("byte_buf_sizer", NettyByteBufSizer.INSTANCE);
    ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
    final HttpRequestDecoder decoder = new HttpRequestDecoder(
        handlingSettings.getMaxInitialLineLength(),
        handlingSettings.getMaxHeaderSize(),
        handlingSettings.getMaxChunkSize());
    decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
    ch.pipeline().addLast("decoder", decoder);
    ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
    ch.pipeline().addLast("encoder", new HttpResponseEncoder());
    final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
    aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
    ch.pipeline().addLast("aggregator", aggregator);
    if (handlingSettings.isCompression()) 
        ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
    
    ch.pipeline().addLast("request_creator", requestCreator);
    ch.pipeline().addLast("response_creator", responseCreator);
    ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
    // Netty4HttpRequestHandler
    ch.pipeline().addLast("handler", requestHandler);
    transport.serverAcceptedChannel(nettyHttpChannel);

Netty4HttpRequestHandler  的  channelRead0() 经过前面流水线编解码处理,这里读取到的消息就是  HttpPipelinedRequest 请求了。会调用  AbstravtHttpServerTransport.incomingRequest() 分发处理。 Netty4HttpRequestHandler.channelRead0() 代码如下:
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest httpRequest) 
    final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
    boolean success = false;
    try 
        // 调用 AbstravtHttpServerTransport.incomingRequest() 处理上来的 http 请求
        serverTransport.incomingRequest(httpRequest, channel);
        success = true;
     finally 
        if (success == false) 
            httpRequest.release();
        
    

前面在 Security 插件获取 Http 服务实现类时传入了 dispatcher,类型是 RestController。这个类现在就起作用了。在  AbstravtHttpServerTransport 处理 Netty 从 socket 读取的 http 请求之后就调用这个类进行请求的分发处理。流程如下: AbstravtHttpServerTransport.incomingRequest()    =>    传入 channelRead() 读取到的  HttpPipelinedRequest 请求。 AbstravtHttpServerTransport.handleIncomingRequest()    =>    请求向后传 AbstravtHttpServerTransport.dispatchRequest()    =>    请求分发 RestController.dispatchRequest()     =>    请求分发处理 也就是说所有的用户 HTTP 请求最终的统一处理入口是 RestController.dispatchRequest()。这里传入的 RestChannel 是 DefaultRestChannel 子类。 然后通过 route、method 获取到处理器之后调用 RestHandler.handleRequest() 接口:
private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler,ThreadContext threadContext)
    throws Exception 
    // 省略
    RestChannel responseChannel = channel;
    try 
        // 省略
        // 调用 RestHandler.handleRequest() 接口
        handler.handleRequest(request, responseChannel, client);
     catch (Exception e) 
        responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
    

调用了不同的 Rest 处理器去处理不同的请求,实际上这些处理器是在另外的模块中进行初始化的。在模块注入过程中注入了 ActionModule 模块到 Guice 容器中。 ActionModule.initRestHandlers() 会将所有  BaseRestHandler() 子类进行注册:
public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) 
    List<AbstractCatAction> catActions = new ArrayList<>();
    Consumer<RestHandler> registerHandler = handler -> 
        if (handler instanceof AbstractCatAction) 
            catActions.a

以上是关于网络模块 transport的主要内容,如果未能解决你的问题,请参考以下文章

《Elasticsearch 源码解析与优化实战》第15章:Transport模块分析

python 网络框架twisted基础学习及详细讲解

dubbo源码实践-transport 网络传输层的例子

dubbo源码实践-transport 网络传输层的例子

Dubbo Remoting模块详解

HDU-4280-Island Transport(网络流,最大流, ISAP)