《Elasticsearch 源码解析与优化实战》第11章:gateway 模块分析
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《Elasticsearch 源码解析与优化实战》第11章:gateway 模块分析相关的知识,希望对你有一定的参考价值。
文章目录
一、gateway 模块分析
1.1、简介
gateway 模块负责集群元信息的存储和集群重启时的恢复。
1.2、元数据
ES 中存储的数据有以下几种:
- state 元数据信息
- index Lucene 生成的索引文件
- translog 事务日志
translog:/Data/apps/search/datas/nodes/0/indices/2A1sfea4RQmGiN3duyF5xw/0/translog/translog-1.tlog
元数据信息又有以下几种:
- nodes/0/_state/*.st 集群层面元信息;
- nodes/0/indices/{index_uuid}/_state/*.st 索引层面元信息
- nodes/0/indices/{index_uuid}/0/_state/*.st ,分片层面元信息。
集群层面:/Data/apps/search/datas/nodes/0/_state/global-2231.st node-2.st
索引层面:/Data/apps/search/datas/nodes/0/indices/2A1sfea4RQmGiN3duyF5xw/_state/state-1.st
分片层面:/Data/apps/search/datas/nodes/0/indices/2A1sfea4RQmGiN3duyF5xw/0/_state/state-1.st
分别对应 ES 中的数据结构:
- MetaData(集群层)主要是 clusterUUID、settings、templates 等
- lndexMetaData(索引层)主要是 numberOfShards、mappings 等
- ShardStateMetaData(分片层)主要是 version、indexUUID、primary 等
上述信息被持久化到磁盘,需要注意的是:持久化的 state 不包括某个分片存在于哪个节点这种内容路由信息,集群完全重启时,依靠gateway的recovery过程重建RoutingTable。 当读取某个文档时,根据路由算法确定目的分片后,从RoutingTable中查找分片位于哪个节点,然后将请求转发到目的节点。
1.3、元数据的持久化
只有具备Master资格的节点和数据节点可以持久化集群状态。当收到主节点发布的集群状态时,节点判断元信息是否发生变化,如果发生变化,则将其持久化到磁盘中。
GatewayMetaState 类负责接收集群状态,它继承自 ClusterStateApplier, 并实现其中的applyClusterState 方法,当收到新的集群状态时,ClusterApplierService 通知全部 applier 应用该集群状态:
private void callClusterStateAppliers (ClusterChangedEvent clusterChangedEvent) {
//遍历全部的applier,依次调用各模块对集群状态的处理
clusterStateAppliers.forEach (applier -> {
try {
//调用各模块实现的applyClusterState
applier. applyClusterState (clusterChangedEvent);
} catch (Exception ex) {
//某个模块应用集群状态出现异常时打印日志,但应用过程不会终止
logger .warn("failed to notify ClusterStateApplier", ex);
});
}
集群状态的发布应用的详细过程请参考 Cluster 模块分。
节点校验本身资格,判断元信是否发生变化,并将其持久化到磁盘中,全局元信息和索引级元信息都来自集群状态。
public void appl applyClusterState(ClusterChangedEvent event) {
//只有具备Master资格的节点和数据节点才会持久化元数据
if (state.nodes().getLocalNode().isMasterNode() || state.nodes().getLocalNode().isDataNode()) {
//检查全局元信息是否发生变化,如果有变化,则将其写入磁盘
if (previousMetaData == nulll || !MetaData.isGlobalStateEquals(previousMetaData, newMetaData)) {
metaStateService.writeGlobalState ("changed", newMetaData);
}
//将发生变化的元信息写磁盘
for (IndexMetaWriteinfo indexMetaWrite : writeinfo) {
metaStateServ.writeindex(indexMetaWrite.reason, indexMetaWrite.newMetaData);
}
}
}
执行文件写入的过程封装在MetaDataStateFormat类中,全局元信息和索引级元信息的写入都执行三个流程:写临时文件、刷盘、“move”成目标文件。
//临时文件名
final Path tmpStatePath = stateLocation. resolve (fileName + ". tmp");
//目标文件名
final Path finalStatePath = stateLocation. resolve (fileName);
//写临时文件
OutputStreamIndexOutput out = new OutputStreamIndexOutput (Files.newOutputStream(tmpStatePath), ...);
out.writeInt(format.index());
//从系统cache刷到磁盘中,保证持久化
IOUtils.fsync(tmpStatePath, false); // fsync the state file
//move为目标文件,move操作为系统原子操作
Files.move (tmpStatePath, finalStatePath, StandardCopyOption.ATOMIC_MOVE);
1.4、元数据的恢复
上述的三种元数据信息被持久化存储到集群的每个节点,当集群完全重启(full restart)时,由于分布式系统的复杂性,各个节点保存的元数据信息可能不同。此时需要选择正确的元数据作为权威元数据。gateway的recovery负责找到正确的元数据,应用到集群。
当集群完全重启,达到recovery条件时,进入元数据恢复流程,一般情况下,recovery 条件由以下三个配置控制。
gateway.expected_nodes
预期的节点数。加入集群的节点数(数据节点或具备 Master 资格的节点)达到这个数量即开始 gateway 恢复。默认为 0;gateway.recover_after_time
如果没有达到预期节点数量,恢复程将等待配置的时间,再尝恢复。默认为 5min;gateway.recover_after_nodes
只要配置数量的节点(数据节点或具备 Master 资格的节点)加入群就可以开始恢复;
假设取值10、5min、8,则启动时节点达 10个则即进入recover可;如果一直没达到10个, 5min 超时后如果节点达到8个也进入 recovery。
还有一些更细致配置,原理与上面三个配置类似:
gateway.expected_master_nodes
:预期的具备 Master 资格的节数,加入集群的具备 Master 资格的节点数达到这个数量后立即开始 gateway 恢复。默认为0;gateway.expected_data_nodes
:预期的具备数据节点资格的节点数,加入集群的具备数据节点资格的节点数量达到这个数量后立即开始gatway的恢复。默认为0;gateway.recover_after_master_nodes
:指定数量具备 Master 资格的节点加入集群后就可以开始恢复;gateway.recover_after_data_nodes
:指定数量数据节点加入集群后就可以开始恢复;
当集群完全启动时,gateway模块负责集群层和索引层的元数据恢复,分片层的元数据恢复在allocation模块实现,但是由gateway模块在执行完上述两个层次恢复工作后触发。
当集群级、索引级元数据选举完毕后,执行submitStateUpdateTask提交一个source为local-gateway-elected-state的任务,触发获取shard级元数据的操作,这个Fetch过程是异步的,根据集群分片数量规模,Fetch过程可能比较长,然后submit任务就结束,gateway流程结束。
因此,三个层次的元数据恢复是由gateway模块和allocation 模块共同完成的,在Gateway将集群级、索引级元数据选举完毕后,在submitStateUpdateTask提交的任务中会执行allocation模块的reroute继续后面的流程。
1.5、元数据恢复流程分析
主要实现在GatewayService 类中,它继承自ClusterStateListener,在集群状态发生变化(clusterChanged)时触发,仅由Master节点执行。
Master
选举成功之后,判断其持有的集群状态中是否存在STATE_NOT_RECOVERED_BLOCK
,如果不存在,则说明元数据已经恢复,跳过gateway恢复过程,否则等待。Master
从各个具备Master资格的节点主动获取元数据信息。- 从获取的元数据信息中选择版本号最大的作为最新元数据,包括集群级、索引级。
- 两者确定之后,调用allocation模块的reroute,对未分配的分片执行分配,主分片分配过程中会异步获取各个shard级别元数据,默认超时为13s。
**集群级和索引级元数据信息是根据存储在其中的版本号来选举的,而主分片位于哪个节点却是allocation模块动态计算出来的,先前主分片不一定还被选为新的主分片。**关于主分片选举策略我们在allocation一章中介绍。
1.5.1、选举集群级和索引级别的元数据
判断是否满足进入recovery条件:实现位于GatewayService#clusterChanged,执行此流程的线程为clusterApplierService#updateTask
。
此处省略相关的代码引用。当满足条件时,进入recovery 主要流程:实现位于Gateway#performStateRecovery
;执行此流程的线程为generic。
首先向有Master资格的节点发起请求,获取它们存储的元数据:
//具有Master资格的节点列表
String[] nodesIds = clusterService.state().nodes().getMasterNodes().keys().toArray(String.class);
//发送获取请求并等待结果
TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();
等待回复时,必须收到所有节点的回复,无论回复成功还是失败(节点通信失败异常会被捕获,作为失败处理),此处没有超时。
在收齐的这些回复中,有效元信息的总数必须达到指定数量。异常情况下,例如,某个节点上元信息读取失败,则回复信息中元数据为空。
int requiredAllocation = Math.max(1, minimumMasterNodesProvider.get());
minimumMasterNodesProvider的值由下面的配置项决定:discovery.zen.minimum_master_nodes
接下来就是通过版本号选取集群级和索引级元数据。
选举集群级元数据:
public void performStateRecovery(...) {
//遍历请求的所有节点
for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNode ()) {
//根据元信息中记录的版本号选举元信息
if (electedGlobalState == null) {
electedGlobalState = nodeState . metaData() ;
} else if (nodeState.metaData().version() > electedGlobalState.version()) {
electedGlobalState = nodeState .metaData() ;
}
}
}
选举索引级元数据:
public void performStateRecovery(. ..) {
final Object[] keys = indices.keys;
//遍历集群的全部索引
for (int i = 0; i < keys. length; i++) {
if (keys[i] != null) {
Index index = (Index) keys[i];
IndexMetaData electedIndexMetaData = null;
//遍历请求的全部节点,对特定索引选择版本号最高的作为该索引元数据
for (TransportNodesListGatewayMetaState.NodeGa tewayMetaStatenodeState : nodesState.getNodes()) {
IndexMetaData indexMetaData = nodeState.metaData ().index(index);
if (electedIndexMetaData == null) {
electedIndexMetaData = indexMetaData;
} else if (indexMetaData.getVersion() > electedIndexMetaData.getVersion()) {
electedIndexMetaData = indexMetaData;
}
}
}
}
}
1.6、触发allocation
当上述两个层次的元信息选举完毕,调用clusterService.submitStateUpdateTask 提交一个集群任务,该任务在masterService#updateTask
线程池中执行,实现位于GatewayRecoveryListener#onSuccess
。
主要工作是构建集群状态(ClusterState),其中的内容路由表依赖allocation模块协助完成,调用allocationService.reroute
进入下一阶段:异步执行分片层元数据的恢复,以及分片分配。updateTask线程结束。至此,gateway恢复流程结束,集群级和索引级元数据选举完毕,如果存在未分配的主分片,则分片级元数据选举和分片分配正在进行中。
1.7、思考
元数据信息是根据版本号选举出来的,而元数据写入成功的条件是“多数”,因此,保证进入recovery的条件为节点数量为“多数”,可以保证集群级和索引级的一致性。
获取各节点存储的元数据,然后根据版本号选举时,仅向具有Master资格的节点获取元数据。
关注我的公众号【宝哥大数据】,更多干货
以上是关于《Elasticsearch 源码解析与优化实战》第11章:gateway 模块分析的主要内容,如果未能解决你的问题,请参考以下文章
《Elasticsearch 源码解析与优化实战》第19章:搜索速度优化
《Elasticsearch 源码解析与优化实战》第19章:搜索速度优化
《Elasticsearch 源码解析与优化实战》第18章:写入速度优化
《Elasticsearch 源码解析与优化实战》第18章:写入速度优化