ES源码分析Transport模块的初始化
Posted 顧棟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ES源码分析Transport模块的初始化相关的知识,希望对你有一定的参考价值。
文章目录
Transport模块的初始化
源码基于6.7.2
传输模块的初始化主要的在节点启动时的构造函数中完成的。
节点启动时,主要在构建函数中,进行通信模块的初始化。
protected Node(
final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings)
logger = LogManager.getLogger(Node.class);
final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
boolean success = false;
try
...
...
...
// 过滤出ActionPlugin插件列表作为参数传入 ActionModule构造函数,在ActionModule中会对TCP和HTTP的请求和处理类进行注册和绑定。
ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(), settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);
...
final RestController restController = actionModule.getRestController();
// 过滤出NetworkPlugin插件列表作为参数传入 NetworkModule构造函数
final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
networkService, restController);
...
...
...
// 通过网络模块获取已经初始化的Transport
final Transport transport = networkModule.getTransportSupplier().get();
Set<String> taskHeaders = Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
Stream.of(Task.X_OPAQUE_ID)
).collect(Collectors.toSet());
// 基于网络模块的Transport构建TransportService
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
// 基于TransportService构建searchTransportService服务
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
final Consumer<Binder> httpBind;
final HttpServerTransport httpServerTransport;
// 通过网络模块获取已经初始化的Transport,HTTP还可以关闭?
if (networkModule.isHttpEnabled())
httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
httpBind = b ->
b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
;
else
httpBind = b ->
b.bind(HttpServerTransport.class).toProvider(Providers.of(null));
;
httpServerTransport = null;
...
...
...
if (NetworkModule.HTTP_ENABLED.get(settings))
logger.debug("initializing HTTP handlers ...");
// 初始化REST的请求和处理类的映射
actionModule.initRestHandlers(() -> clusterService.state().nodes());
logger.info("initialized");
success = true;
...
...
...
ActionModule的初始化
ActionModule的内部初始化是通过插件的方式加载的,主要完成注册Action与处理类的映射和RestController的创建。
public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
ThreadPool threadPool, List<ActionPlugin> actionPlugins, NodeClient nodeClient,
CircuitBreakerService circuitBreakerService, UsageService usageService)
this.transportClient = transportClient;
this.settings = settings;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indexScopedSettings = indexScopedSettings;
this.clusterSettings = clusterSettings;
this.settingsFilter = settingsFilter;
this.actionPlugins = actionPlugins;
// 进行Action设置,action注册和对应handler的映射绑定
actions = setupActions(actionPlugins);
// 对action过滤器进行配置
actionFilters = setupActionFilters(actionPlugins);
autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver);
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<String> headers = Stream.concat(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of(Task.X_OPAQUE_ID)
).collect(Collectors.toSet());
// rest包装器
UnaryOperator<RestHandler> restWrapper = null;
for (ActionPlugin plugin : actionPlugins)
UnaryOperator<RestHandler> newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext());
if (newRestWrapper != null)
logger.debug("Using REST wrapper from plugin " + plugin.getClass().getName());
if (restWrapper != null)
throw new IllegalArgumentException("Cannot have more than one plugin implementing a REST wrapper");
restWrapper = newRestWrapper;
mappingRequestValidators = new TransportPutMappingAction.RequestValidators(
actionPlugins.stream().flatMap(p -> p.mappingRequestValidators().stream()).collect(Collectors.toList())
);
if (transportClient)
restController = null;
else
// 构建出RestController对象
restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);
NetworkModule的初始化
构造NetworkModule对象,在执行构造函数的时候,通过插件的方式加载这3个成员对象。
主要数据成员对象
Map<String, Supplier<Transport>> transportFactories
Map<String, Supplier<HttpServerTransport>> transportHttpFactories
List<TransportInterceptor> transportIntercetors
Transport :负责内部节点的RPC请求
HttpServerTransport:负责客户端的REST服务
TransportInterceptor:传输层拦截器
public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,
BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
NetworkService networkService, HttpServerTransport.Dispatcher dispatcher)
this.settings = settings;
this.transportClient = transportClient;
// 遍历插件,分别注册HttpTransport,Transport,TransportInterceptor
for (NetworkPlugin plugin : plugins)
if (transportClient == false && HTTP_ENABLED.get(settings))
Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,
circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher);
for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet())
// 其实检查不能使用TransportClient创建HTTP的通信传输,不能有同名的HttpServerTransport
registerHttpTransport(entry.getKey(), entry.getValue());
Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler,
circuitBreakerService, namedWriteableRegistry, networkService);
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet())
// 检查不有同名的Transport
registerTransport(entry.getKey(), entry.getValue());
List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry,
threadPool.getThreadContext());
for (TransportInterceptor interceptor : transportInterceptors)
// 注册传输层拦截器
registerTransportInterceptor(interceptor);
在构造完成时候,通过getTransportSupplier
,getHttpServerTransportSupplier
,getTransportInterceptor
对外提供服务。
NetworkPlugin
NetworkPlugin是一个接口,Netty4Plugin实现了它,同时继承了Plugin。
在Netty4Plugin中实现了NetworkPlugin的getTransports和getHttpTransports方法,分别构建了Netty4Transport和Netty4HttpServerTransport用于Transport(TCP)传输和HTTP传输。
Netty4Transport
通过类图,可以发现Netty4Transport继承了TcpTransport,TcpTransport实现了Transport接口,这应该是在实现传输层,控制数据的在传输层的交互。Netty4Transport中实现了doStart的抽象方法,用来启动TCP服务。在启动的时候,默认情况下,同时构建了Client端和Server端。主要借助netty4框架来实现这些功能。
@Override
protected void doStart()
boolean success = false;
try
ThreadFactory threadFactory = daemonThreadFactory(settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX);
eventLoopGroup = new NioEventLoopGroup(workerCount, threadFactory);
// 初始化客户端
clientBootstrap = createClientBootstrap(eventLoopGroup);
// 默认是开启服务端配置的,初始化Server端
if (NetworkService.NETWORK_SERVER.get(settings))
for (ProfileSettings profileSettings : profileSettings)
createServerBootstrap(profileSettings, eventLoopGroup);
bindServer(profileSettings);
super.doStart();
success = true;
finally
if (success == false)
doStop();
Netty4HttpServerTransport
根据类图发现 Netty4HttpServerTransport 同时继承了AbstractLifecycleComponent和实现了HttpServerTransport。
同样实现了doStart的抽象方法,用来启动HTTP Server服务。在HTTP Server服务中配置了监听端口和处理器。这里其实应该是通过Netty4来完成HTTP协议下的传输层的部分。
@Override
protected void doStart()
...
// 配置了请求的处理类HttpChannelHandler
serverBootstrap.childHandler(configureServerChannelHandler());
...
// 绑定端口作为HTTP监听端口
this.boundAddress = createBoundHttpAddress();
...
configureServerChannelHandler
方法中构建了一个HttpChannelHandler
对象,HttpChannelHandler
的构造函数中有两个成员变量Netty4HttpServerTransport
和Netty4HttpRequestHandler
。当收到请求的时候,会调用dispatchRequest
对不同的请求执行相应的处理。dispatchRequest
是接口HttpServerTransport
类的内部接口Dispatcher中的方法。主要用来转发请求。它的主要实现类是RestController
。
TransportService的初始化
在Node的启动过程中会在初始化Transport之后,基于Transport构建TransportService。newTransportService方法中其实是调用TransportService的构造函数。
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings, Set<String> taskHeaders)
this(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders,
// 创建连接管理器
new ConnectionManager(settings, transport));
ConnectionManager
是Node传输连接的管理类,通过这个类的connectionManager.connectToNode(node, connectionProfile, connectionValidator(node));
解析连接配置文件创建内部连接。在ConnectionManager
的构造函数中调用ConnectionProfile.buildDefaultConnectionProfile(settings)
,这个方法会根据配置文件提供的信息创建连接。在TransportRequestOptions.Type
中发现连接的分类。
public class TransportRequestOptions
...
...
...
public enum Type
RECOVERY, // 用于恢复
BULK, // 用于批量写入
REG, // 不是很清楚,有一个是集群注册
STATE, // 传输集群的状态
PING // ping请求
在TransportSettings
类中发现各类TCP连接的连接数,默认合计13个。
public final class TransportSettings
...
...
...
public static final Setting<Integer> CONNECTIONS_PER_NODE_RECOVERY =
intSetting("transport.connections_per_node.recovery", 2, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_BULK =
intSetting("transport.connections_per_node.bulk", 3, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_REG =
intSetting("transport.connections_per_node.reg", 6, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_STATE =
intSetting("transport.connections_per_node.state", 1, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_PING =
intSetting("transport.connections_per_node.ping", 1, 1, Setting.Property.NodeScope);
...
...
...
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders, ConnectionManager connectionManager)
ElasticSearchEs 源码之 Transport 和 TransportService 源码解读
《Elasticsearch 源码解析与优化实战》第15章:Transport模块分析
《Elasticsearch 源码解析与优化实战》第15章:Transport模块分析