网络模块 transport
Posted 0x13
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了网络模块 transport相关的知识,希望对你有一定的参考价值。
【模块说明】 transport-netty4 modules/transport-netty4: 这个就是网络模块包,需要通过 gradle 打包之后将整个模块包生成的 jar 包复制到数据目录下,通过 PluginsService.getModuleBundles() 进行加载。模块的功能是提供基础的 TCP、HTTP 服务,例如通信等等。具体通信消息的业务逻辑处理不在这个包内。模块目录结构如下:处理HTTP协议编解码: org\\elasticsearch\\http\\netty4\\Netty4HttpChannel.java netty 通道对象 Channel 的包装类 org\\elasticsearch\\http\\netty4\\Netty4HttpPipeliningHandler.java org\\elasticsearch\\http\\netty4\\Netty4HttpRequest.java 封装的 HttpRequest 对象 org\\elasticsearch\\http\\netty4\\Netty4HttpRequestCreator.java org\\elasticsearch\\http\\netty4\\Netty4HttpRequestHandler.java netty 处理 http 请求的 pipline 处理类 org\\elasticsearch\\http\\netty4\\Netty4HttpResponse.java org\\elasticsearch\\http\\netty4\\Netty4HttpResponseCreator.java org\\elasticsearch\\http\\netty4\\Netty4HttpServerChannel.java org\\elasticsearch\\http\\netty4\\Netty4HttpServerTransport.java 核心的 HTTP 服务。 |
处理TCP协议编解码,基础节点通信: org\\elasticsearch\\transport\\CopyBytesServerSocketChannel.java org\\elasticsearch\\transport\\CopyBytesSocketChannel.java org\\elasticsearch\\transport\\netty4\\ESLoggingHandler.java org\\elasticsearch\\transport\\netty4\\Netty4MessageChannelHandler.java 封装的 netty pipline 处理类 org\\elasticsearch\\transport\\netty4\\Netty4TcpChannel.java netty 通道对象 Channel 的包装类,client 端。 org\\elasticsearch\\transport\\netty4\\Netty4TcpServerChannel.java netty 通道对象 Channel 的包装类,server 端。 org\\elasticsearch\\transport\\netty4\\Netty4Transport.java 核心的 TCP 服务,节点之间创建链接。 org\\elasticsearch\\transport\\netty4\\Netty4Utils.java org\\elasticsearch\\transport\\Netty4NiosocketChannel.java |
公共处理: org\\elasticsearch\\transport\\Netty4Plugin.java 网络模块插件的加载入口。 org\\elasticsearch\\transport\\NettyAllocator.java org\\elasticsearch\\transport\\NettyByteBufSizer.java org\\elasticsearch\\transport\\SharedGroupFactory.java |
【主要的类】 NetworkModule: 网络模块 NetworkModule 加载处理集群网络相关的逻辑的实现类的入口。该模块针对 TCP、HTTP 协议进行了封装,封装了 TCP 用于集群保持长连接通信,封装了 HTTP 用于处理各种外部 Rest 请求。如果需要在网络模块中添加自己的网络服务,也是在这里添加。该类有三个集合来保存服务类: (1)Map<String, Supplier<Transport>> transportFactories = new HashMap<>();//保存 Transport 模块,调用 NetworkModule.registerTransport()。Transport 负责集群节点之间的 rpc 请求。 (2)Map<String, Supplier<HttpServerTransport>> transportHttpFactories = new HashMap<>();//保存 Http 模块,调用 NetworkModule.registerHttpTransport()。HttpServerTransport 负责集群接收用户http请求。 (3)List<TransportInterceptor> transportInterceptors = new ArrayList<>();//保存拦截器,调用 NetworkModule.registerTransportInterceptor()。 同时调用下列方法,从集合中获取 TCP、HTTP的处理类: (1)NetworkModule.getHttpServerTransportSupplier()。 (2)NetworkModule.getTransportSupplier()。 NetworkPlugin: 网络插件,网络模块初始化时加载这些插件,这些插件提供网络服务。默认配置下有四种网络插件 XPackPlugin、Netty4Plugin、Security、VotingOnlyNodePlugin。其中 Netty4Plugin 插件是我们关注的封装了 TCP 和 HTTP 服务实现类。这些服务实现类会被添加到 NetworkModule 的三个集合中。 (1)添加 HttpServerTransport 到集合 transportHttpFactories 中 (2)添加 Transport 到集合 transportFactories 中 (3)添加 TransportInterceptor 到集合 transportInterceptors 中 Netty4Plugin: 网络插件之一,继承自 NetworkPlugin() 接口,其中主要是下列两个接口用于构造网络实现类: (1)getTransports()://返回Map<String, Supplier<Transport>>,然后添加到 transportFactories 集合中。接口返回 Netty4Transport ,以加载 TCP 服务到容器中。key=netty4。 (2)getHttpTransports(); //Map<String, Supplier<HttpServerTransport>>,然后添加到 transportHttpFactories 集合中。接口返回 Netty4HttpServerTransport,以记载 HTTP 服务到容器中。key=netty4。 Security: 网络插件之一,继承自 NetworkPlugin() 接口,其中主要是下列两个接口用于构造网络实现类: (1)getTransports()://返回Map<String, Supplier<Transport>>,然后添加到 transportFactories 集合中。接口返回 SecurityNetty4ServerTransport、SecurityNioTransport 加载 TCP 服务到容器中。key=security4、security-nio。 (2)getHttpTransports(); //Map<String, Supplier<HttpServerTransport>>,然后添加到 transportHttpFactories 集合中。接口返回 SecurityNetty4HttpServerTransport、SecurityNioHttpServerTransport,以记载 HTTP 服务到容器中。key=security4、security-nio。 HTTP服务和TCP服务:
Netty4HttpServerTransport | Netty4Transport |
封装了 http 的处理器。由 Netty4Plugin.getHttpTransports() 创建。http编解码和处理。 | 集群节点通信的 tcp 处理器。由 Netty4Plugin.getTransports() 创建。tcp编解码和处理。 |
ch.pipeline().addLast("byte_buf_sizer", NettyByteBufSizer.INSTANCE); ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler()); ch.pipeline().addLast("decoder",HttpRequestDecoder); ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor()); ch.pipeline().addLast("encoder", new HttpResponseEncoder()); ch.pipeline().addLast("encoder", newHttpObjectAggregator()); ch.pipeline().addLast("encoder_compress", new HttpContentCompressor()); ch.pipeline().addLast("request_creator", requestCreator); ch.pipeline().addLast("response_creator", responseCreator); ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler()); ch.pipeline().addLast("response_creator", new Netty4HttpRequestHandler()); | ch.pipeline().addLast("byte_buf_sizer", NettyByteBufSizer.INSTANCE); ch.pipeline().addLast("logging", new ESLoggingHandler()); ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler()); |
【网络模块初始化】 网络模块的初始化: 该模块同样是 Node 节点对象初始化时执行,加载模块时进行初始化。可以看到注册的 HttpTransport、Transport 根据插件实现类 NetworkPlugin 有不同的实现类: key=netty4 会注册 Netty4HttpServerTransport、Netty4Transport。 key=security4 会注册 SecurityNetty4HttpServerTransport、SecurityNetty4ServerTransport。 key=security-nio 会注册 SecurityNioTransport、SecurityNioHttpServerTransport。 【log】Bootstrap.init() 系统初始化入口 【log】new NetworkModule() 网络模块创建 【log】new Node() 创建当前节点的Node对象,以及 modules 注入 【log】new NetworkModule() 网络模块创建,isTransportClient=false,networkPlugin=4 【log】new NetworkModule() 插件实现类 NetworkPlugin=class org.elasticsearch.transport.Netty4Plugin 【log】Netty4Plugin.getHttpTransports() 返回 Netty4HttpServerTransport 对象,dispatcher=class org.elasticsearch.rest.RestController 【log】NetworkModule.registerHttpTransport() key=netty4 【log】Netty4Plugin.getTransports() 返回 Netty4Transport 对象 【log】NetworkModule.registerTransport() key=netty4 【log】new NetworkModule() 插件实现类 NetworkPlugin=class org.elasticsearch.xpack.security.Security 【log】NetworkModule.registerHttpTransport() key=security4 【log】NetworkModule.registerHttpTransport() key=security-nio 【log】NetworkModule.registerTransport() key=security4 【log】NetworkModule.registerTransport() key=security-nio 【log】NetworkModule.registerTransportInterceptor() 初始化 new NetworkModule() 时会加载各个 NetworkPlugin(),代码如下:
public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,
BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
NetworkService networkService, HttpServerTransport.Dispatcher dispatcher,
ClusterSettings clusterSettings)
this.settings = settings;
this.transportClient = transportClient;
// 便利网络插件,也就是网络实现类
for (NetworkPlugin plugin : plugins)
// 添加 HttpServerTransport 到集合 transportHttpFactories 中
Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher, clusterSettings);
if (transportClient == false)
for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet())
registerHttpTransport(entry.getKey(), entry.getValue());
// 添加 Transport 到集合 transportFactories 中
Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet())
registerTransport(entry.getKey(), entry.getValue());
// 添加 TransportInterceptor 到集合 transportInterceptors 中
List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry, threadPool.getThreadContext());
for (TransportInterceptor interceptor : transportInterceptors)
registerTransportInterceptor(interceptor);
遍历每个 NetworkPlugin 子类,然后调用 getHttpTransports() 、 getTransports()、getTransportInterceptors() 获取插件定义的网络服务类,重点是 Netty4Plugin、Security 两个插件如下:
Netty4Plugin 代码,实际上是初始化了 Netty4HttpServerTransport、Netty4Transport 两个对象,初始化后加入集合:
@Override
public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher,
ClusterSettings clusterSettings)
return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
() -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher,
clusterSettings, getSharedGroupFactory(settings)));
@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService)
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, getSharedGroupFactory(settings)));
在 Node 初始化加载 modules 过后,会调用下列两个方法从两个集合中取出 TCP 处理类、HTTP 处理类,按照key进行获取:
(1)NetworkModule.getHttpServerTransportSupplier()。默认按照 key=security4,获取 SecurityNetty4HttpServerTransport。如果配置按照 key=netty4 则会获取 Netty4HttpServerTransport
(2)NetworkModule.getTransportSupplier()。默认按照 key=security4,获取 SecurityNetty4ServerTransport。如果配置按照 key=netty4 则会获取Netty4Transport。
因为 SecurityNetty4ServerTransport 继承自 Netty4Transport,构造函数调用了 super(),会一起初始化父对象和子对象。
调用流程如下:
【log】new Node() 时调用 networkModule.getTransportSupplier().get() 获取 transport=class org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4ServerTransport
【log】NetworkModule.getTransportSupplier() 默认配置 name=security4
【log】Node.newHttpTransport() 从 NetworkModule.transportHttpFactories 集合中获取 HttpServerTransport=class org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4HttpServerTransport
【log】NetworkModule.getHttpServerTransportSupplier() 默认配置 name=security4
注意在获取服务时会判断默认配置,也就是key:
如果开启了xpack。默认配置是 security4,所有服务是使用的 security4缓存。
如果未开启xpaxk。默认配置是 netty4,所有服务是使用的 netty4缓存。
配置如下:
xpack.security.enabled: false
配置判断的代码在 NetworkModule.getHttpServerTransportSupplier()、NetworkModule.getTransportSupplier() 如下:
public Supplier<HttpServerTransport> getHttpServerTransportSupplier()
final String name;
if (HTTP_TYPE_SETTING.exists(settings))
name = HTTP_TYPE_SETTING.get(settings);
else
name = HTTP_DEFAULT_TYPE_SETTING.get(settings);
final Supplier<HttpServerTransport> factory = transportHttpFactories.get(name);
if (factory == null)
throw new IllegalStateException("Unsupported http.type [" + name + "]");
return factory;
【集群RPC】请求响应交互 根据网络模块和网络插件的实现,HTTP在底层的编解码服务实现类是 SecurityNetty4ServerTransport 及其父类 Netty4Transport。 在节点 Node 对象初始化过程中,前面获取到各个 Transport 对象后(SecurityNetty4ServerTransport、SecurityNetty4HttpServerTransport),就开始封装 TransportService 对象。 并初始化 ClusterConnectionManager 用于管理集群链接。后续会调用TcpTransport.initiateConnection() 初始化每个 node 的连接。过程如下: 【log】new ClusterConnectionManager() 初始化集群链接管理器 【log】new TransportService() 初始化集群通信服务 TransportService 对象封装好之后就会调用 doStart()启动,紧接着调用TCP服务并且绑定本地端口才能监听 socket 数据,流程如下 TransportService.doStart() => 集群通信服务启动入口 SecurityNetty4ServerTransport.doStart() => 同时调用父类的 doStart() Netty4Transport.doStart() => 启动网络服务,监听 9300 默认端口。 【log】TransportService.doStart() 启动 【log】SecurityNetty4ServerTransport.doStart() 启动 【log】Netty4Transport.doStart() 启动 【log】TransportService.java|doStart|271|main publish_address 172.16.8.191:9301, bound_addresses 172.16.8.191:9301 集群节点之间创建连接过程,在节点 Node 启动之后,在 Raft 选举流程中需要给其他节点发送 pre-vote 以决定大多数,实际上在 PeerFinder 类中,该类用于集群节点发现,也就是集群节点建立连接的入口: PeerFinder 会根据配置找到另外两个节点,调用 PeerFinder.startProbe(),也就是节点探查,去尝试连接其他节点,此时节点会建立一个连接,因为此时处于节点探查期,只能获取到目标 ipd:port,无法获取到节点 name: 【log】PeerFinder.startProbe() 开始节点探查 【log】TransportService.openConnection() 【log】PeerFinder.establishConnection() 节点发现,创建连接 【log】ClusterConnectionManager.openConnection() 【log】HandshakingTransportAddressConnector.connectToRemoteMasterNode() 【log】ClusterConnectionManager.internalOpenConnection() 初始化集群链接,transport=class org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4ServerTransport 【log】TransportService.openConnection() 【log】TcpTransport.openConnection() 打开节点链接,初始化 List<TcpChannel> ,当前实现类 this=class org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4ServerTransport 【log】ClusterConnectionManager.openConnection() 【log】TcpTransport.initiateConnection() 初始化连接,nodeId:172.16.8.191:9301,nodeName=,连接个数numConnections=1 【log】TcpTransport.initiateConnection() 初始化连接,nodeId:172.16.8.191:9302,nodeName=,连接个数numConnections=1 因为基于TCP的rpc通信,每个节点都会暴露默认的 9300端口,节点之间相互连接会形成网状结构,节点越多连接指数增长。每个节点只需要保证自己连接到了集群里面其他节点就行。在确定了 Master 之后会提交各种集群任务,此时会在节点建立更多的连接(默认13)不同连接处理不同的请求,节点之间默认全部连接全部成功才成功,一个连接异常会关闭所有连接,因为前面节点探查期间已经建立了连接了,此时获取到了节点名称,以 node-1 连接 node-2/node-3 为例: 【log】TcpTransport.initiateConnection() 初始化连接,nodeId:ypVpKa9lQmula90_RgrW8w,nodeName=node-2,连接个数numConnections=13 【log】TcpTransport.initiateConnection() 初始化连接,nodeId:i6sYVTpdQ6aCJf-C3cAuvQ,nodeName=node-3,连接个数numConnections=13 建立连接,就是调用前面的 SecurityNetty4ServerTransport 服务来初始化 channel,TcpTransport.initiateConnection() 代码如下:
private List<TcpChannel> initiateConnection(DiscoveryNode node, ConnectionProfile connectionProfile,ActionListener<Transport.Connection> listener)
int numConnections = connectionProfile.getNumConnections();
assert numConnections > 0 : "A connection profile must be configured with at least one connection";
final List<TcpChannel> channels = new ArrayList<>(numConnections);
// 节点之间会建立默认 numConnections=1或者13 个连接,每个连接不同用途,这里初始化 channel
for (int i = 0; i < numConnections; ++i)
try
TcpChannel channel = initiateChannel(node);
logger.trace(() -> new ParameterizedMessage("Tcp transport client channel opened: ", channel));
channels.add(channel);
catch (ConnectTransportException e)
CloseableChannel.closeChannels(channels, false);
listener.onFailure(e);
return channels;
catch (Exception e)
CloseableChannel.closeChannels(channels, false);
listener.onFailure(new ConnectTransportException(node, "general node connection failure", e));
return channels;
ChannelsConnectedListener channelsConnectedListener = new ChannelsConnectedListener(node, connectionProfile, channels,
new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, listener, false));
for (TcpChannel channel : channels)
channel.addConnectListener(channelsConnectedListener);
TimeValue connectTimeout = connectionProfile.getConnectTimeout();
threadPool.schedule(channelsConnectedListener::onTimeout, connectTimeout, ThreadPool.Names.GENERIC);
return channels;
调用了静态方法 TcpTransport.initiateChannel(),默认的实现类是 SecurityNetty4ServerTransport(x-pack-security.jar)。
但是 SecurityNetty4ServerTransport 未实现 initiateChannel() 方法,所以最终调用的其父类方法 Netty4ServerTransport(transport-netty4-client.jar) 的 initiateChannel() 方法,其中会调用 new ClientChannelInitializer() 为每个连接绑定处理器 Handler。
Netty4Transport.initiateChannel() 如下:
@Override
protected Netty4TcpChannel initiateChannel(DiscoveryNode node) throws IOException
InetSocketAddress address = node.getAddress().address();
Bootstrap bootstrapWithHandler = clientBootstrap.clone();
// 初始化处理器 handler
bootstrapWithHandler.handler(getClientChannelInitializer(node));
bootstrapWithHandler.remoteAddress(address);
ChannelFuture connectFuture = bootstrapWithHandler.connect();
Channel channel = connectFuture.channel();
if (channel == null)
ExceptionsHelper.maybeDieOnAnotherThread(connectFuture.cause());
throw new IOException(connectFuture.cause());
Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, false, "default", connectFuture);
channel.attr(CHANNEL_KEY).set(nettyChannel);
return nettyChannel;
绑定处理器handler部分,集群 TCP 的 rpc 消息编解码处理类是 Netty4MessageChannelHandler()。
Netty4Transport.ClientChannelInitializer():
protected class ClientChannelInitializer extends ChannelInitializer<Channel>
@Override
protected void initChannel(Channel ch) throws Exception
addClosedExceptionLogger(ch);
assert ch instanceof Netty4NioSocketChannel;
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
ch.pipeline().addLast("byte_buf_sizer", NettyByteBufSizer.INSTANCE);
ch.pipeline().addLast("logging", new ESLoggingHandler());
// using a dot as a prefix means this cannot come from any settings parsed
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this));
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
ExceptionsHelper.maybeDieOnAnotherThread(cause);
super.exceptionCaught(ctx, cause);
初始化 Netty4MessageChannelHandler 时,会注入一个 TcpTransport::inboundMessage(),用于处理从 channelRead() 接口在网络中读取来的数据:
Netty4MessageChannelHandler(PageCacheRecycler recycler, Netty4Transport transport)
this.transport = transport;
final ThreadPool threadPool = transport.getThreadPool();
final Transport.RequestHandlers requestHandlers = transport.getRequestHandlers();
// 创建 InboundPipeline
this.pipeline = new InboundPipeline(
transport.getVersion(),
transport.getStatsTracker(),
recycler,
threadPool::relativeTimeInMillis,
transport.getInflightBreaker(),
requestHandlers::getHandler,
transport::inboundMessage // 处理 channelRead() 从网络读取的 bytes 数据
);
对目标节点的13个连接全部创建 channels 之后,会在 TcpTransport.onResponse() 函数中创建 NodeChannels() 对象来保存目标节点、以及13个 channel:
可以发现连接类型包含 RECOVERY、BULK、REG、STATE、PING 等类型。也就是对应了共五种RPC请求。其中 NodeChannels 对象还提供了方法通过 type 获取缓存的 channel,以及最终的消息发送方法。
【log】TcpTransport.onResponse() 目标节点所有链接创建完毕 channels=13
【log】new NodeChannels() 保存目标节点的所有连接,连接类型:
【log】RECOVERY=org.elasticsearch.transport.ConnectionProfile$ConnectionTypeHandle@3e7171ae,
【log】BULK=org.elasticsearch.transport.ConnectionProfile$ConnectionTypeHandle@735945e7,
【log】REG=org.elasticsearch.transport.ConnectionProfile$ConnectionTypeHandle@44830da3,
【log】STATE=org.elasticsearch.transport.ConnectionProfile$ConnectionTypeHandle@5d68ebd0,
【log】PING=org.elasticsearch.transport.ConnectionProfile$ConnectionTypeHandle@68210dd2
消息类型分类:
recovery:做数据恢复recovery,默认个数2个;
bulk:用于bulk请求,默认个数3个;
reg:典型的搜索和单doc索引,默认个数6个;
state :如集群state的发送等,默认个数1个;
ping:就是node之间的ping咯。默认个数1个;
其中缓存的连接类型是 TcpChannel,他是 Channel 包装类,实际上创建的子类型是 Netty4TcpChannel,在节点消息发送过程中会根据请求类型(
RECOVERY、BULK、REG、STATE、PING)取出一个 Netty4TcpChannel,最终会调用 channel.write() 发送消息:
TransportService.sendRequest() => 调用tcp连接 channel 进行消息发送,先调用 TranportService.getConnection() 判断是远端节点还是本地节点。
【log】TransportService.sendRequestInternal() 发送请求入口
【log】TcpTransport.NodeChannels.sendRequest() request=LeaderCheckRequestsender=node-1
【log】NodeChannels.channel() 根据请求类型获取缓存的channel,type=PING
【log】TcpTransport.NodeChannels.sendRequest() 消息类型=PING,获取得到TcpChannel子类=Netty4TcpChannel
【log】OutboundHandler.sendRequest() 向目标节点发送请求的入口
【log】OutboundHandler.internalSend() 调用 channel.sendMessage()
请求发送到其他节点后,其他节点会在 Netty4MessageChannelHandler.channelRead() 获取到消息,调用 InboundPipeline.handleBytes(),最终的处理如下:
Netty4MessageChannelHandler.channelRead() 代码如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
assert Transports.assertTransportThread();
assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass();
final ByteBuf buffer = (ByteBuf) msg;
Netty4TcpChannel channel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();
final BytesReference wrapped = Netty4Utils.toBytesReference(buffer);
try (ReleasableBytesReference reference = new ReleasableBytesReference(wrapped, buffer::release))
pipeline.handleBytes(channel, reference);
接下来 InboundPipeline.doHandleBytes() 会将请求进行 decoder.decode() 解码,解码成功后按照请求类型进行分发,解码后的请求会进入 InboundPipeline.forwardFragments() 接口。
InboundPipeline.forwardFragments() 接口如下:
private void forwardFragments(TcpChannel channel, ArrayList<Object> fragments) throws IOException
for (Object fragment : fragments)
// 如果是 Header 则调用 InboundAggregator aggregator 进行处理
if (fragment instanceof Header)
assert aggregator.isAggregating() == false;
aggregator.headerReceived((Header) fragment);
// 如果时 PING 则调用 BiConsumer<TcpChannel, InboundMessage> messageHandler 进行处理
// messageHandler 就是前面初始化注入的 TcpTransport::inboundMessage()
else if (fragment == InboundDecoder.PING)
assert aggregator.isAggregating() == false;
messageHandler.accept(channel, PING_MESSAGE);
else if (fragment == InboundDecoder.END_CONTENT)
assert aggregator.isAggregating();
try (InboundMessage aggregated = aggregator.finishAggregation())
statsTracker.markMessageReceived();
messageHandler.accept(channel, aggregated);
else
assert aggregator.isAggregating();
assert fragment instanceof ReleasableBytesReference;
aggregator.aggregate((ReleasableBytesReference) fragment);
最终消息进入 InboundHandler.messageReceived() 中进行处理,当前节点收到另外两个节点的 leader_heck 请求:
InboundHandler.messageReceived() => 处理其他节点来到的消息。这里判断是别人发送过来的请求还是响应
InboundHandler.handleRequest() => 如果当前消息是其他节点发送过来的请求,则从 requestHandlers.getHandler(action) 根据action 获取指定请求动作的处理器。
RequestHandlerRegistry.processMessageReceived() => 请求处理器的注册器,出去里面的请求处理器来处理请求
TransportRequestHandler.messageReceived() => 最终的请求处理器对请求进行处理,这里是接口,具体子类很多是匿名的。
【log】InboundHandler.handleRequest() 处理其他节点的请求,requestId=187,action=internal:coordination/fault_detection/leader_check,处理器reg=class org.elasticsearch.transport.RequestHandlerRegistry
【log】InboundHandler.handleRequest() 处理其他节点的请求,requestId=97,action=internal:coordination/fault_detection/leader_check,处理器reg=class org.elasticsearch.transport.RequestHandlerRegistry
InboundHandler 的成员变量有两个比较重要的:
Transport.RequestHandlers requestHandlers; 处理其他节点发送过来的请求。这个类维护了一个 Map<String, RequestHandlerRegistry<? extends TransportRequest>> requestHandlers
Transport.ResponseHandlers responseHandlers; 处理其他节点发送过来的响应。这个类维护了一个 ConcurrentMapLong<ResponseContext<? extends TransportResponse>> handlers
这两个是处理器集合,区别在于:
RequestHandlers 是处理其他节点发过来的请求,所以缓存的 key 是 action(操作),每个节点提前将不同的 action 的处理器缓存进去,在收到其他节点请求之后直接取出来执行。
ResponseHandlers 是处理其他节点发过来的响应,缓存的 key 是请求编号用于请求和响应对应起来。当前节点发送请求到其他节点后,将请求编号和响应处理器缓存起来等待其他节点返回响应过来,然后根据响应的编号从缓存中取出来执行。
RequestHandlers 注册请求处理器 TransportRequestHandler 过程:
这里注册进去的是 TransportRequestHandler 接口实现类,需要实现 messageReceived() 方法,很多实现类是匿名的,所以无法通过 reg.getHandler().toString() 来打印,但是可以打印 action,这些 action 实际上在代码里面是可以搜索到的。
Transport.RequestHandlers 注册请求处理器代码如下:
synchronized <Request extends TransportRequest> void registerHandler(RequestHandlerRegistry<Request> reg)
if (requestHandlers.containsKey(reg.getAction()))
throw new IllegalArgumentException("transport handlers for action " + reg.getAction() + " is already registered");
requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
public synchronized <Request extends TransportRequest> void forceRegister(RequestHandlerRegistry<Request> reg)
requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
经过调试发现默认注册了 577 个请求处理器,分为三类 cluster、internal、indices:
Transport.ResponseHandlers 注册请求处理器代码如下:
是处理其他节点发过来的响应,缓存的 key 是请求编号用于请求和响应对应起来。所以这个处理器不是提前缓存的,而是在每次发送请求之后将请求id和响应处理进行缓存,在发送请求之前,将响应和requestId进行缓存:
TransportService.sendRequestInternal() 代码如下:
private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler)
// 省略
// 将请求的响应处理函数以及请求id 进行缓存,用于后续监听到响应后取出来执行
final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));
// 省略
try
// 省略
// 这里发送请求
connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
catch (final Exception e)
// 省略
到此为止,请求的发送、请求回来响应的处理已经清楚了,还剩下最后一步,就是节点A发送请求到B,节点A缓存响应处理函数后,节点B对请求进行处理之后需要将响应回传给节点A,然后节点A再根据响应请求id取出响应处理器进行处理。
那么节点B在处理完请求后,如何将响应返回给节点A呢?先看节点B对节点A的请求处理:
以 action=internal:cluster/request_pre_vote 预投票请求为例,由 PreVoteController 在初始化时注册该请求的处理器,处理其他节点发送过来的预投票请求
也就是注册的 PreVoteController.handlePreVoteRequest() 函数来处理其他节点发送过来的请求,该函数返回 PreVoteResponse(TransportResponse子类)。
最后调用 请求节点的 Channel.sendResponse() 将请求处理过后的响应进行回传。实现的子类时 TaskTransportChannel.sendResponse() 。
PreVoteCollector(final TransportService transportService, final Runnable startElection, final LongConsumer updateMaxTermSeen,
final ElectionStrategy electionStrategy, NodeHealthService nodeHealthService)
this.transportService = transportService;
this.startElection = startElection;
this.updateMaxTermSeen = updateMaxTermSeen;
this.electionStrategy = electionStrategy;
this.nodeHealthService = nodeHealthService;
// 注册 REQUEST_PRE_VOTE_ACTION_NAME 请求的处理
transportService.registerRequestHandler(REQUEST_PRE_VOTE_ACTION_NAME, Names.GENERIC, false, false,
PreVoteRequest::new,
(request, channel, task) -> channel.sendResponse(handlePreVoteRequest(request)));
此时响应由节点B回传给节点A,那么响应又回到了节点A的 Netty4MessageChannelHandler.channelRead() 中了,同理和前面处理其他节点发送的请求一样,也是根据数据来判断是请求还是响应。按照前面的分析我们已经知道节点收到TCP数据的调用流程:
InboundPipeline.doHandleBytes() => 会将请求进行 decoder.decode() 解码。
InboundPipeline.forwardFragments() => 按照数据类型进行分发。
InboundHandler.messageReceived() => 开始处理节点收到其他节点发送的TCP数据。
在最后一步中就会判断其他节点传过来的是请求还是响应,如果是响应则调用 InboundHandler.handleResponse()。并且通过 requestId 将缓存的响应处理器也查找了出来。
InboundHandler.messageReceived() 代码如下:
private void messageReceived(TcpChannel channel, InboundMessage message, long startTime) throws IOException
// 省略
ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext existing = threadContext.stashContext())
// 其他节点发送的请求
if (header.isRequest())
handleRequest(channel, header, message);
// 其他节点发送的响应
else
assert message.isShortCircuit() == false;
final TransportResponseHandler<?> handler;
// 获取请求id,根据id从缓存查找响应处理器
long requestId = header.getRequestId();
if (header.isHandshake())
handler = handshaker.removeHandlerForHandshake(requestId);
else
TransportResponseHandler<? extends TransportResponse> theHandler = responseHandlers.onResponseReceived(requestId, messageListener);
if (theHandler == null && header.isError())
handler = handshaker.removeHandlerForHandshake(requestId);
else
handler = theHandler;
// 如果有响应处理器则调用
if (handler != null)
final StreamInput streamInput;
if (message.getContentLength() > 0 || header.getVersion().equals(Version.CURRENT) == false)
// 省略
if (header.isError())
handlerResponseError(streamInput, handler);
else
// 执行响应处理
handleResponse(remoteAddress, streamInput, handler);
因为前面已经将响应处理器(函数)、响应(参数)都拿到了后续只需要执行就行了。接下来就在 InboundHandler.handleResponse() 中完成的。调用的 InboundHandler.doHandleResponse()
InboundHandler.handleResponse() 代码如下:
private <T extends TransportResponse> void doHandleResponse(TransportResponseHandler<T> handler, T response)
//处理其他节点传过来的响应数据
try
handler.handleResponse(response);
catch (Exception e)
handleException(handler, new ResponseHandlerFailureTransportException(e));
finally
response.decRef();
【集群HTTP】请求响应处理 根据网络模块和网络插件的实现,HTTP在底层的编解码服务实现类是 SecurityNetty4HttpServerTransport 及其父类 Netty4HttpServerTransport。这两个类实现了 Netty 接口提供 HTTP 请求的编解码以及后端服务实现。和 TCP 服务同理,也是调用 doStart() 启动服务监听端口。 流程如下: TransportService.doStart() 启动 => 集群通信服务服务启动入口 SecurityNetty4HttpServerTransport.doStart() => 启动服务监听9200端口,并调用父类的 doStart() Netty4HttpServerTransport.doStart() => 监听9200 端口。 【log】TransportService.doStart() 启动 【log】SecurityNetty4ServerTransport.doStart() 启动 【log】Netty4Transport.doStart() 启动 【log】AbstractHttpServerTransport.bindServer() address=/172.16.8.191 【log】AbstractHttpServerTransport.java|bindServer|203|main publish_address 172.16.8.191:9201, bound_addresses 172.16.8.191:9201 然后 Netty4HttpServerTransport.doStart() 在启动过程中会给 Netty 的流水线初始化一个 HttpChannelHandler()。 Netty4HttpServerTransport.doStart() 代码如下:
protected void doStart()
boolean success = false;
try
sharedGroup = sharedGroupFactory.getHttpGroup();
serverBootstrap = new ServerBootstrap();
// 省略
// 初始化 HttpChannelHandler
serverBootstrap.childHandler(configureServerChannelHandler());
serverBootstrap.handler(new ServerChannelExceptionHandler(this));
// 省略
// 绑定本地端口开启 HTTP 服务器
bindServer();
success = true;
finally
// 省略
HttpChannelHandler() 就是继承了 ChannelInitializer 的流水线初始化工具,在 initChannel() 过程中会在流水线中加入一个 Netty4HttpRequestHandler,这个就是处理 socket 监听的 http 请求处理类。
HttpChannelHandler.initChannel() 代码如下:
protected void initChannel(Channel ch) throws Exception
// 注入 netty http 解析的流水线
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
ch.pipeline().addLast("byte_buf_sizer", NettyByteBufSizer.INSTANCE);
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
final HttpRequestDecoder decoder = new HttpRequestDecoder(
handlingSettings.getMaxInitialLineLength(),
handlingSettings.getMaxHeaderSize(),
handlingSettings.getMaxChunkSize());
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
ch.pipeline().addLast("decoder", decoder);
ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
ch.pipeline().addLast("aggregator", aggregator);
if (handlingSettings.isCompression())
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
ch.pipeline().addLast("request_creator", requestCreator);
ch.pipeline().addLast("response_creator", responseCreator);
ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
// Netty4HttpRequestHandler
ch.pipeline().addLast("handler", requestHandler);
transport.serverAcceptedChannel(nettyHttpChannel);
Netty4HttpRequestHandler 的 channelRead0() 经过前面流水线编解码处理,这里读取到的消息就是 HttpPipelinedRequest 请求了。会调用 AbstravtHttpServerTransport.incomingRequest() 分发处理。
Netty4HttpRequestHandler.channelRead0() 代码如下:
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest httpRequest)
final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
boolean success = false;
try
// 调用 AbstravtHttpServerTransport.incomingRequest() 处理上来的 http 请求
serverTransport.incomingRequest(httpRequest, channel);
success = true;
finally
if (success == false)
httpRequest.release();
前面在 Security 插件获取 Http 服务实现类时传入了 dispatcher,类型是 RestController。这个类现在就起作用了。在 AbstravtHttpServerTransport 处理 Netty 从 socket 读取的 http 请求之后就调用这个类进行请求的分发处理。流程如下:
AbstravtHttpServerTransport.incomingRequest() => 传入 channelRead() 读取到的
HttpPipelinedRequest 请求。
AbstravtHttpServerTransport.handleIncomingRequest() => 请求向后传
AbstravtHttpServerTransport.dispatchRequest() => 请求分发
RestController.dispatchRequest() => 请求分发处理
。
也就是说所有的用户 HTTP 请求最终的统一处理入口是 RestController.dispatchRequest()。这里传入的 RestChannel 是 DefaultRestChannel 子类。
然后通过 route、method 获取到处理器之后调用 RestHandler.handleRequest() 接口:
private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler,ThreadContext threadContext)
throws Exception
// 省略
RestChannel responseChannel = channel;
try
// 省略
// 调用 RestHandler.handleRequest() 接口
handler.handleRequest(request, responseChannel, client);
catch (Exception e)
responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
调用了不同的 Rest 处理器去处理不同的请求,实际上这些处理器是在另外的模块中进行初始化的。在模块注入过程中注入了 ActionModule 模块到 Guice 容器中。
ActionModule.initRestHandlers() 会将所有 BaseRestHandler() 子类进行注册:
public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster)
List<AbstractCatAction> catActions = new ArrayList<>();
Consumer<RestHandler> registerHandler = handler ->
if (handler instanceof AbstractCatAction)
catActions.a以上是关于网络模块 transport的主要内容,如果未能解决你的问题,请参考以下文章