《Elasticsearch 源码解析与优化实战》第15章:Transport模块分析

Posted 宝哥大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《Elasticsearch 源码解析与优化实战》第15章:Transport模块分析相关的知识,希望对你有一定的参考价值。

简介

传输模块用于集群内节点之间的内部通信。从一个节点到另一个节点的每个调用都使用传输模块。例如,当一个节点处理HTTP GET请求时,实际上是由持有该数据的另一个节点处理的,这就需要处理HTTP GET请求的节点将请求通过传输模块转发给另一个节点。

传输机制是完全异步的,这意味着没有阻塞线程等待响应。使用异步通信的好处是解决了C10k问题,也是广播请求/收集结果(例如,ES中的搜索)的理想解决方案。

配置信息

传输模块配置

TCP transport是传输模块基于TCP的实现,有以下配置,如下表所示。

默认情况下,传输模块使用9300端口通信。该端口承载了三种不同的业务:客户端的JavaAPI通信,节点间的内部通信,以及自动发现的通信。使用transport profiles, 可以将三者配置到不同的地址和端口。例如:

transport.profiles.default.port: 9300-9400
transport.profiles.default.bind_host: 10.0.0.1
transport.profiles.client.port: 9500-9600
transport.profiles.client.bind_host: 192.168.0.1
transport.profiles.dmz.port: 9700-9800
transport.profiles.dmz.bind_host: 172.16.1.2

这在部分场景下很有用,例如,想保护集群内部通信的9300端口,只对外开放9200端口,但又不想影响Java API的使用,那么可以为Java API单独配置一个服务端口。

传输模块有一个专用的tracer 日志,当它被激活时,日志系统会记录传入和传出的请求。可以通过动态设置org.elasticsearch.transport.TransportService.tracer为TRACE级别来开启:

curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'
{
    "transient" : {
        "logger.org.elasticsearch.transport.TransportService.tracer" : "TRACE"
    }
}

还可以使用一组include和exclude通配符模式来控制tracer 的内容。默认情况下,除了ping的故障检测请求,所有请求都将被跟踪:

curl -X PUT "localhost:9200/_ cluster/settings" -H 'Content- -Type: application/json' -d'
{
    "transient" : {
        "transport.tracer.include" : "*",
        "transport.tracer.exclude" : "internal:discovery/zen/ fd*"
    }
}

通用网络配置

除了传输模块配置,还有一些其他的网络配置信息。

network.host

节点将绑定到此主机名或IP地址,并将此主机公布到集群中的其他节点。接受一个IP地址、主机名、一个特殊值,或者是这些内容任意组合的数组。默认为_local_

discovery.zen.ping.unicast.hosts

为了加入集群,节点需要知道集群中一些其他节点的主机名或IP地址。此设置提供节点将尝试联系其他节点的初始列表,接受IP地址或主机名。如果主机名被解析为多个IP地址,那么每个IP地址都将用于discovery。轮询DNS也可以在discovery中使用,每次返回一个不同的IP地址,如果IP地址不存在则抛出异常,并在下一轮ping时重新解析(取决于JVM DNS缓存)。默认值为[“127.0.0.1”, “[:1]”]。

http.port

用于接受HTTP请求的绑定端口。接受单个值或范围。如果指定了一个范围,则该节点将绑定到该范围内的第一个可用端口。默认为9200~9300。

transport.tcp.port

为节点之间的通信绑定端口。接受单个值或范围。如果指定了一个范围,则该节点将绑定到该范围内的第一个可用端口。默认为9300~9400。 network.host允许设置以下特殊值,如下表所示。

默认情况下,这些特殊的值在IPv4和IPv6上都可以正常工作,但是也可以使用:ipv4:pv6说明符来明确指定。例如,en0:ipv4_只绑定到en0的IPv4地址。

network.host是常用的设置监听地址的方式,同时设置绑定主机和发布主机。在一些高级用例中,可能需要为它们设置不同值。

network.bind_host

指定节点应该绑定到哪个网络接口,以便监听传入的请求。一个节点可以绑定到多个接口,例如,两个网卡,或者一个站点本地地址和一个回环地址。默认为network.host

network.publish_host

发布主机是节点向集群中发布的单个网口,以便其他节点可以连接它。目前,ES可以绑定到多个地址,但只发布一个地址。如果没有指定,则默认为来自network..host的“最佳”地址。按IPv4/IPv6栈优先排序,然后是可达性。如果为network.host设置多个地址,但在节点到节点的通信中依赖于特定的地址,那么应该显式地设network.publish_host

以上两个配置都可以像network.host一样配置。它们接收IP地址、主机名和特殊值。

基于TCP的所有组件(如HTTP和传输模块)共享以下高级设置,如下表所示。

Transport 总体架构

ES的传输模块和HTTP传输模块都是基于Netty实现的。Netty是一个Java实现的高性能异步网络通信库,基于epoll/kqueue实现事件处理。

我们说的传输模块,目前只有一种实现,就是TCP传输模块。如上节所述,TCP传输模块有三类用处:内部节点间通信(我们称为RPC)、JavaAPI 客户端,以及节点发现。HTTP模块负责服务用户的REST请求。

网络层

网络层是对内部各种传输模块的抽象,使得上层发送/接收数据时不必关心底层的实现,使用Netty 还是其他类库,上层并不关心。 在内部实现中,传输模块和HTTP模块统一封装到NetworkModule类中。顾名思义,该类是在TCP传输模块和HTTP传输模块之上封装的,实现了对各种传输模块的初始化,上层的发送和接收依赖对网络模块的引用。

该类的几个重要数据成员如下表所示。

上述三个成员在NetworkModule的构造函数(节点启动时调用)中通过插件方式加载。

主要对外接口如下表所示。

1. 网络模块初始化

初始化NetworkModule传输模块和HTTP传输模块之后,上层就可以通过该类对外提供的接口获取某个传输模块了。该类返回的各种传输模块和拦截器都是虚类型,因此本质上就是对网络层的一个抽象。

NetworkModule内部组件的初始化是通过插件方式加载的。在其构造函数中传入NetworkPlugin列表,NetworkPlugin 是一个接口类, Netty4Plugin 从这个接口实现,如下图所示。

在Netty4Plugin中,分别构建了Netty4Transport 和Netty4HttpServerTransport,用于传输模块和HTTP传输模块:

public class Netty4Plugin extends Plugin implements NetworkPlugin {
    //构建Netty4Transport作为Transport
    public Map<String, Supplier<Transport>> getTransports(...) {
        return Collections.singletonMap (NETTY_TRANSPORT_NAME, () -> new Netty4Transport (settings, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService));
    }
    // 构建Netty4HttpServerTransport MF h HttpServerTransport
    public Map<String, Supplier<HttpServerTransport>> getHttpTransports(...) {
        return Collections.singletonMap (NETTY_HTTP_TRANSPORT_NAME, () -> new Netty4HttpServerTransport (settings, networkService,bigArrays, threadPool, xContentRegistry, dispatcher));
    }
}

根据加载的NetworkPlugin 插件和定义好的REST处理器初始化NetworkModule:

//已注册好的REST请求处理器
final RestController restController = actionModule.getRestController() ;
//初始化Networ kModule
final NetworkModule networkModule = new NetworkModule (settings, false, pluginsService.filterPlugins (NetworkPlugin.class), threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, restController);

2. Netty4Transport

Netty4Transport用于RPC等内部通信,其继承自TcpTransport, 类结构如下图所示。

在初始化时构建了Client 和Server:

protected void doStart () {
    //初始化Client
    bootstrap = createBootstrap() ;
    if (NetworkService . NETWORK_ SERVER.get (settings)) {
        for(ProfileSettings profileSettings : profileSettings) {
            //初始化Serve
            createServerBootstrap (profileSettings) ;
            bindServer (profileSettings) ;
        }
    }
}

3. Netty4HttpServerTransport

Netty4HttpServerTransport 用于响应REST请求,其继承自HttpServerTransport,如下图所示:

同样在Netty4HttpServerTransport#doStart中创建一个 HTTP Server 监听端口,当收到用户请求时,调用dispatchRequest 对不同的请求执行相应的处理。哪种请求被哪个类处理这种信息注册在ActionModule类中。

服务层

服务层指网络模块的.上层应用层,基于网络模块提供的Transport 来收/发数据。本节重点分析节点间通信,该通信使用TransportService类实现,在网络模块提供的Transport 基础上,该类提供连接到节点、发送数据、注册事件响应函数等方法。其初始化过程如下:

//通过网络模块获取已初始化的Transport
final Transport transport = networkModule.getTransportSupplier().get() ;
//基于网络模块的Transport构建TransportService
final TransportService transportService = newTransportService(settings, transport,threadPool, networkModule.getTransportInterceptor(),localNodeFactory, settingsModule.getClusterSettings());

在节点内部任何通信前,首先需要连接到集群的其他节点。

1. 连接到节点

在默认配置下,ES的每个节点与其他节点都会保持13个长连接。每个连接有各自的用途。可以通过配置调节某种业务使用的连接数。

当本节点连接到某个特定的节点时, TransportService通过网络层提供的transport.connectToNode完成连接。在完成连接后,内部维护一个NodeChannels类对象,表示节点到节点的连接。其中包括多个TCP连接(默认为13个),并记录了每个连接的用途。目前,这些连接有以下几类用途,定义在TransportRequestOptions.Type中。

public enum Type {
    RECOVERY,//用于恢复.
    BULK,//用于批量写入
    REG, //其他用途,例如,加入集群等
    STATE, //传输集群状态
    PING //用 作nodeFD或masterFD的ping请求
}

这些连接被ConnectionProfile统一管理:

static ConnectionProfile buildDe faul tConnect ionProfile (Settings settings) {
    //默认为 2个
    int connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings);
    //默认为3个
    int connections PerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings) ;
    //默认为6个
    int connect ionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings) ;
    //默认为1个
    int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.ge (settings) ;
    //默认为1个
    int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings) ;
    ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
    builder.addConnections (connectionsPerNodeBulk, TransportRequestoptions.Type.BULK);
    // .....
    return builder.build();
}

节点间每种用途的连接数量可以通过以下配置进行调整:

transport.connections_per_node.recovery 
transport.connections_per_node.bulk 
transport.connections_per_node.reg 
transport.connections_per_node.state 
transport.connections_per_node.ping  

NodeChannels类保存了这些连接,当发送数据时,根据请求类型找到对应的连接来发送数据。

public final classNodeChannelsimplementsConnection{
    //保存每个请求对应的连接
    private final Map<TransportRequestoptions.TypeConnectionProfile.ConnectionTypeHandle> typeMapping;
    //保存已建立的TCP连接
    private final List<TcpChannel> channels;
    //目的节点是哪个
    private final DiscoveryNode node;
}

建立连接过程如下,如果13个连接中有一个连接失败,则整体认为失败,关闭已建立的连接。

public final NodeChannels openConnection (DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException {
    //获取总连接数
    int numConnections = connectionProfile.getNumConnections();
    List<TcpChannel> channels = new ArrayList<> (numConnections);
    for (int i = 0; i < numConnections; ++i) {
        try {
            //建立一个连接.
            TcpChannel channel = initiateChannel (node,connectionProfile.getConnectTimeout(),connectFuture) ;
            channels.add(channel);
        } catch (Exception e) {
            //如果产生异常,则关闭所有连接
            TcpChannel.closeChannels(channels, false);
            throw e ;
        }
    }
    //构建并返回NodeChannels
    nodeChannels = new NodeChannels (node, channels, connectionProfile, version);
    return nodeChannels;
}

2.发送请求

内部的RPC请求通过TransportService#sendRequest发送,在之前的章节中经常可以见到这种发送请求的调用。sendRequest会检查目的节点是否是本节点,如果是本节点,则在sendLocalRequest方法中直接通过action 获取对应的Handler,调用对应的处理函数。简化的实现过程如下:

private void sendLocalRequest(...) {
    final RequestHandlerRegistry reg = getRequestHandler(action);
    reg.processMessageReceived (request, channel) ;
}

当需要发送到网络时,调用asyncSender.sendRequest 执行发送,最终通过TcpTransport.NodeChannels#sendRequest发送数据,先根据请求类型获取相应的连接,某种类型如果有多个连接,例如,BULK请求,则会在多个连接中轮询选择。

public void sendRequest (long requestId, String action, TransportRequest request, TransportRequestoptions options) {
    //通过请求类型获取13个连接中的相应连接
    TcpChannel channel = channel (options. type()) ;
    //发送请求
    sendRequestToChannel (this.node, channel, requestId, action,request, options, getVersion()(byte) 0);
}

3. 定义对Response的处理

当通过TransportService#sendRequest 发送一个RPC请求时,本节点作为RPC客户端,需要同时定义当服务器执行完RPC,返回Response 后,对这个Response如何处理。Transport-ResponseHandler类负责定义对这个Response的处理。在发送请求的sendRequest函数的最后一个参数中定义,例如:

transportService . sendRequest (primaryNode, IN_FLIGHT_OPS_ACTION_NAME,new InFlightOpsRequest(shardId), new TransportResponseHandler<InFlightOpsResponse>() {
    //返回一个新的response
    public InFlightOpsResponse newInstance () {
        return new InFlightOpsResponse() ;
    }
    //对远程节点执行成功的处理
    public void handleResponse (InFlightOpsResponse response) {
        listener.onResponse(response);
    }
    //对远程节点执行失败的处理
    public void handleException (TransportException exp) {
        logger.debug("{} unexpected error while retrieving in flightop count", shardId) ;
        listener . onFailure(exp) ;
    }
    //返回线程池名称
    public String executor() {
        return ThreadPool.Names.SAME ; 
    }
}) ;

TransportResponseHandler类主要定义了对远程节点执行RPC请求后返回成功还是失败处理。

4. 定义对请求的处理

本节点也需要处理来自其他节点的RPC请求,因此需要定义对每个RPC使用哪个模块进行处理。具体参考RPC一节。

REST解析和处理

对REST请求的处理就是定义某个URI应该由哪个模块处理。在ES中, REST请求和RPC请求都称为Action,对REST请求执行处理的类命名规则为Rest* Action。

ActionModule类中注册了某个类对某个REST请求的处理,并对外提供这种映射关系,每个REST处理类继承自RestNodesInfoAction。处理类在ActionModule中的注册过程如下:

public void initRestHandlers (Supplier<DiscoveryNodes> nodesInCluster) {
    registerHandler.accept (new RestMainAction (settings, restController) ) ;
    registerHandler.accept (new RestNodesInfoAction (settings, restController, settingsFilter)) ;
    registerHandler.accept (new RestRemoteClusterInfoAction (settings, restController)) ;
    //省略大部分的注册
}

以RestNodesInfoAction为例,在构造函数中注册对某个URL的处理如下

public Res tNodes InfoAction (Settings settings, RestController controller, SettingsFilter settingsFilter) {
    super (settings) ;
    controller.registerHandler (GET, "/_nodes", this) ;
    controller.registerHandler (GET, "/_nodes/(nodeId]", this) ;
    this.settingsFilter = settingsFilter;
}

同时,每个REST请求处理类需要实现一个prepareRequest 函数,用于在收到请求时,对请求执行验证工作等,当一个请求到来时,网络层调用BaseRestHandler#handleRequest。 在这个函数中,会调用子类的prepareRequest,然后执行这个Action:

public final void handleRequest (RestRequest request, RestChannel channel, NodeClient client) throws Exception {
    //调用子类的prepareRequest
    final RestChannelConsumer action = prepareRequest(request, client) ;
    //执行子类定义的任务
    action.accept(channel);
}

对Action的具体处理定义在处理类的prepareRequest的Lambda表达式中,例如:

public RestChannelConsumer prepareRequest (final RestRequest request, final NodeClient client) throws IOException {
    //省略对请求的预处理
    //Action具体要执行的任务
    return channel -> client.admin().cluster().getSnapshots(getSnapshotsRequest, new RestToXContentListener<> (channel));
}

当Netty收到HTTP请求后,调用Netty4HttpServerTransport#dispatchRequest, 该方法根据 定义好的Action调用对应的Rest*Action处理类。

RPC实现

RPC是远程过程调用的简称,当一个节点需要另一个节点执行某些操作时,例如,创建、删除索引等,向这个节点发送一个RPC请求,ES的RPC基于TCP实现,底层是Netty 的Netty4Transport。每个RPC在内部称为Action,有唯一的名称, 例如,cluster:monitor/main。 当传输模块收到一个RPC请求时,会根据这个Action名称获取对应的处理类。

TransportService类是在网络层之.上对RPC的发送与接收的服务层封装,虽然从模块设计角度来说,网络层的设计对内部是否使用Netty框架是解耦的,除Netty外,也可以使用其他通信框架,但是为了让读者更容易理解,我们看一下从TransportService到Netty4Transport的联系,

如下图所示。

在上图中, Node2调用sendRequest发送请求,发送时传入定义好的TransportResponseHandler,TransportService调用Netty4Transport 的sendRequest 发送数据。当远程节点处理完毕,Netty4Transport的handleResponse方法最终回调发送请求时定义的TransportResponseHandler。

Node1接收请求,通过registerRequestHandler注册Action和对应的处理类TransportRequest-Handler。TransportService 类中维护了Action 与处理类的对应关系。当Netty4Transport 收到请求后,handleRequest方法中调用TransportService 类的getRequestHandler(action)通过(客户端请求中的) Action 获取相应的处理类,最终调用TransportRequestHandler执行对RPC的处理逻辑。

RPC的注册和映射

一个RPC请求与处理模块的对应关系在两方面维护:

  • 在ActionModule类中注册Action与处理类的映射;
  • 通过TransportService#registerRequestHandler方法注册Action名称与对应处理器的映射。

这两种映射关系同时存在。RPC先在ActionModule 类中注册,再调用TransportService#registerRequestHandler在TransportService 类中注册。在大部分情况下,网络层收到请求后根据TransportService注册的Action信息获取对应的处理模块。

1. ActionModule类中的注册

与REST Action 的注册类似,内部RPC也注册在ActionModule类中,描述某个Action应该被哪个类处理。一个Action可以理解为RPC调用的名称。Action 与处理类的映射关系如下:

static Map<String, ActionHandler<?, ?>> setupActions (List<ActionPlugin> actionPlugins) {
    ActionRegistry actions = new ActionRegistry() ;
    actions.register (MainAction.INSTANCE, TransportMainAction.class) ;
    actions.register (NodesInfoAction.INSTANCE,
    TransportNodesInfoAction.class) ;
    actions.register (RemoteIn foAction.INSTANCE,
    TransportRemoteInfoAction.class) ;
    //省略大部分 Action的注册
}

register函数的第-一个参数是名称规则为*Action的类,以比较简单的MainAction为例,其 类结构如下图所示。

这个类主要定义了Action的名称及返回响应,同样以MainAction为例,其实现如下:

public class MainAction extends Action<MainRequest, MainResponse, MainReques tBuilder> {
    //定义Action名称,后续会根据Action名称找到对应的处理类
    public static final String NAME = "cluster:monitor/main";
    public static final MainAction INSTANCE = new MainAction() ;
    public MainRequestBuilder newRequestBuilder (ElasticsearchClient client) {
        return new MainRequestBuilder (client,INSTANCE) ;
    }
    public MainResponse newResponse() {
        return new MainResponse() ;
    }
}

第二个参数是名称规则为Transport*Action的类,在这个类中定义对此Action的具体处理。 以TransportMainAction类为例,其类结构如下图所示。

注意,许多Transport* Action类都会继承自HandledTransportAction。 而在HandledTransport-Action类的构造函数中,会调用TransportServce#registerRequestHandler 在TransportService类中注册处理器。因此,许多在ActionModule类中注册的RPC信息会自动在TransportService中添加映射关系。

以TransportMainAction类为例,其实现如下:

public class TransportMainAction extends HandledTransportAction<MainRequest, MainResponse> {
    public Transpor tMainAction() {
    }
    //定义对此RPC的处理.
    protected void doExecute (MainRequest request, ActionListener<MainResponse> listener) {
    }
}

doExcute函数汇总需要实现最重要的对RPC请求的具体处理逻辑。

2. TransportService类中的注册

在TransportService中注册RPC信息是为了在收到传输层的请求后,通过Action字符串找到对应的处理类。注册过程需要提供两个关键信息: Action 名称与请求处理类(TransportRequestHandler对象)。

在TransportService类中通过registerRequestHandler注册RPC信息的来源可以分为两种,一种 是来自ActionModule的注册,Transport* Action类的父类HandledTransportAction在其构造函数中自动调用registerRequestHandler;另一种 是来自其他模块的直接调用registerRequestHandler,例如,TransportReplicationAction 和MasterFaultDetection。

以HandledTransportAction类在TransportService 中的注册为例,其注册过程如下:

transportService.registerRequestHandler (actionName,request, ThreadPool.Name.SAME,false canTripCircuitBreaker, new TransportHandler());

TransportService将映射维护在一个Map中:

Map<StringRequestHandlerRegistry>

Map的key为Action名称,RequestHandlerRegistry 中封装了与RPC相关的Action名称、处理器等信息。通过它可以找到对应的处理模块。为registerRequestHandler 传入的最后一个参数就是定义的处理器。这个处理器需要从TransportRequestHandler 类继承。如前所述,在TransportService类中注册RPC的时机来源于ActionModule和HandledTransportAction的构造函数,我们以HandledTransportAction构造函数中注册时为例,其处理器定义如下:

class TransportHandler implements Transpor tReques tHandler<Request> {
    //当收到RPC请求
    public final void messageReceived (final Request request, final TransportChannel channel, Task task《Elasticsearch 源码解析与优化实战》第19章:搜索速度优化

《Elasticsearch 源码解析与优化实战》第19章:搜索速度优化

《Elasticsearch 源码解析与优化实战》第18章:写入速度优化

《Elasticsearch 源码解析与优化实战》第18章:写入速度优化

《Elasticsearch 源码解析与优化实战》样章-第 6 章 数据模型

《Elasticsearch 源码解析与优化实战》第9章:Search流程