ElasticSearch深入理解 relocating rebalance 对Elasticsearch集群的影响

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticSearch深入理解 relocating rebalance 对Elasticsearch集群的影响相关的知识,希望对你有一定的参考价值。

在这里插入图片描述

1.概述

转载:深入理解 relocating 对Elasticsearch集群的影响

rebalance 用于将集群中的分片进行均衡,保持各个节点的分片数量大致相等,当集群扩容或缩容,掉一个节点的时候,这过程会自动完成。直观的感觉他应该是在后台默默干活的过程,最多占用带宽和磁盘 io 资源,应该感受不到他的存在,但实际情况是,他可能引起一些意想不到的问题。

这篇文章主要思考分片 relocating 对集群会有哪些影响(基于 v7.7),有下面几个。

shard-started RPC 会抢占较多的 master 处理时间

分片移动结束后,target 节点会向 master 发起一个 shard-started PRC,该 RPC 具有次高优先级:URGENT, master 对该 RPC 的处理比较慢,尤其是在集群分片数量到几万的级别,从而可能导致一些较低优先级的 RPC 长时间来不及执行,例如 put-mapping,进而导致对业务的明显影响。

由于 rebalance,allocation filtering,forced awareness等任意原因产生的 shard-started RPC 都会存在这个问题,例如扩容集群的时候,如果把 rebalance 并发开的比较大,对 master 的处理能力造成明显影响。因此对于分片数较多的集群,当你想要加大 rebalance 和 recovery 并发的时候要考虑到对 master 的影响。

move 一个主分片,对写入流程的影响
当一个主分片被 rebalance 或者手工 move 的时候,可以想象必然存在一个时间段该主分片无法写入。

Elasticsearch 对主分片的 relocating 也是直接 move,不会先将主分片资格让给其他副分片,再进行 move,即便如此,也会存在一个时间点进行切换,无法响应写入。

这个时间段从 RPC 的时序来看的话,如下图所示,红色标记的区域为阻塞写入流程的时间段。在该时间段内,客户端的写入请求会被阻塞无返回,直到这部分处理完成。

在这里插入图片描述

以手工 move 为例,当一个主分片从 node-idea move 到 node1时,主要有以下几个阶段完成:

  • 首先会通过发布新的集群状态,将该分片标记为 RELOCATING状态,更新 routing_table 和 routing_nodes

  • 数据节点收到集群状态后,开始执行 recovery 流程,该流程自 target 节点发送 start_recovery开始,至 Source 阶段返回 start_recovery response 结束。

  • recovery 完成后,target发送 shards-started RPC 给 master 节点,master 节点再次下发集群状态,将该分片标记为 STARTED 状态。

阻塞客户端 bulk 写入的阶段,就在 source 节点执行 finalize 阶段时,准备发送 handoff RPC 时开始,直到收到 master 新的集群状态,将该分片标记为 STARTED,每个节点应用集群状态的时间点略有差异,所以每个节点停止阻塞写入的时间点也会有微小差异。

handoff 的 RPC的作用是告知 target 节点该分片可以切换主分片状态,对 handoff RPC 的处理都是一些计算操作,其中涉及到几个锁,一般会很快完成。

整个主分片的 relocating 过程对写入的影响是很复杂的处理过程,我将他们划分为几个阶段,下面是详细过程。

阻塞过程详细原理

在分片的整个 relocating 过程中,对 target 节点的请求都会被转发到 source 节点,直到 target 节点应用了 master 下发的分片变为 STARTED 的集群状态。

正常写入阶段

起初,写入请求到达 source 节点,像往常一样正常写入到主分片中,直到复制阶段。

复制阶段

此阶段将收到的写请求复制给 target 节点。他从收到 prepare_translog 的 response之后开始,直到 handoff 阶段开始之前。

1.将 target 分片加入到 replication group

收到 prepare_translog response之后,recovery 的 phase1阶段已结束,segments 文件发送完毕,target 节点的 engine 已启动。通过 shard.initiateTracking将 target 分片加入到复制组,后续的写入操作都会复制给 target 节点。

prepareEngineStep.whenComplete(prepareEngineTime -> {
    runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
                shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);
    final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", historySource, startingSeqNo);
});

复制组信息在 ReplicationTracker 的 replicationGroup成员中维护。

2.在写入过程中,将写请求复制给 target

在写入流程的 performOnPrimary#doRun() 函数的 finishRequest方法中,最终调用 ReplicationOperation#handlePrimaryResult函数的 performOnReplicas方法将 index 发送给 target 节点。

handoff阻塞阶段
handoff阶段是为了完成主分片状态的转移,该阶段对收到的写入请求放到队列中,待主分片状态转移完成后继续执行,最多阻塞30分钟。期间对 target 节点的写入也会被转发的 source 节点执行。此阶段从 source 节点收到 finalize 的 response之后开始,直到下一阶段。

1.recovery 过程设置阻塞标记

在 source 节点收到 finalize 的 response之后,通过 handoff RPC 交接主分片状态,这个交接过程通过 indexShardOperationPermits.blockOperations 对此时收到的写入请求进行阻塞,最多30分钟。

 public void relocated(final String targetAllocationId, final Consumer<ReplicationTracker.PrimaryContext> consumer){
        try  {
            indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {//block 的范围就是这段括号里的代码
                final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(targetAllocationId);
                try {//下面调用 RemoteRecoveryTargetHandler.handoffPrimaryContext
                    consumer.accept(primaryContext);//执行这里之后发送和响应了 handoff_primary_context RPC
                } 
            });
        }
    }

indexShardOperationPermits.blockOperations 中的阻塞操作只是 queuedBlockOperations 加1,后面写入流程会检查 queuedBlockOperations 的值是否为0

2.写入流程的处理
产生在写入链路的 acquire函数:

acquire:255, IndexShardOperationPermits (org.elasticsearch.index.shard)
acquirePrimaryOperationPermit:2764, IndexShard (org.elasticsearch.index.shard)
handlePrimaryRequest:256, TransportReplicationAction 

该函数判断 queuedBlockOperations 大于0,于是将他添加到 delayedOperations,写入流程到此结束。delayedOperations中的操作会在本阶段的阻塞过程结束后处理。

失败重试阶段

失败重试阶段自source 节点处理完 handoff response之后开始,直到 master 下发了将分片标记为 STARTED 的集群状态,数据节点应用了集群状态之后结束,在应用这个集群状态之前,分片处于 relocating 状态,期间的写入操作由 source 节点执行,并且会写入失败,抛出 ShardNotInPrimaryModeException 异常,并捕获该异常,等待1分钟后重试写入,再次失败则退出;如果1分钟内收到了新的集群状态,也会重试写入,然后写入成功。

写入流程中,执行 acquire 函数时,发现 queuedBlockOperations为0,执行onAcquired.onResponse(releasable),调用到 wrapPrimaryOperationPermitListener函数时,发现分片已经不是主分片状态,抛出 ShardNotInPrimaryModeException 异常。

if (replicationTracker.isPrimaryMode()) {
    l.onResponse(r);
} else {
    r.close();
    l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
}

对于上者返回的异常,写入流程 acquirePrimaryOperationPermit 设置的 Listener会区分异常类型,如果是 ShardNotInPrimaryModeException 异常,则等待1分钟后进行重试。

move 一个副分片,对写入流程的影响
副分片的 move 过程与主分片的 move 过程相似,也是下发两次集群状态,通过 recovery 复制到 target 节点。

需要注意的是,recovery 复制分片到 target 节点,并非从副分片当前所在节点复制到 target,而是从主分片复制到 target。例如,主分片在节点 node-idea 上,副分片在 node-1,当我们从 node-1 move 到 node-2时,recovery 是从 node-idea 复制到 node-2。当 node-1应用主节点发布的分片状态变为 STARTED 的集群状态时,发现分片已经属于自己,于是删除本节点的分片。

与主分片的 relocating 类似的是,在复制阶段,会将正在 recovery 的分片添加到 replication group,这样 replication group 中就有了三个分片,主分片,原副分片,以及 move 到 target 节点的新的副分片。在复制阶段,写入操作会写入到这三个分片。

以手工 move 一个副分片为例,从 node-1 move 到 node-2时,当收到第一个第一个集群状态,分片被标记为 relocating,于是数据从 node-idea recovery 到 node-2。在此期间,新的 index 操作会由主分片节点复制到另外两个节点。如下图所示。

图片

这个过程相当于先增加了一个副分片,执行副分片的 recovery 流程复制到新的 target。

当 recovery 执行完毕,master 下发第二个集群状态,分片被标记为 STARTED,node-1原来持有的分片被删除,如下图所示。

图片

副分片的 relocating 过程不会有 handoff 阶段,整个 relocating 过程如同副分片的 recovery 过程一样,没有对写入进行阻塞的阶段。

以上是关于ElasticSearch深入理解 relocating rebalance 对Elasticsearch集群的影响的主要内容,如果未能解决你的问题,请参考以下文章

深入理解ElasticSearch

ElasticSearch深入理解 relocating rebalance 对Elasticsearch集群的影响

深入理解Elasticsearch的分布式架构

Elasticsearch教程 Elasticsearch查询语法 Elasticsearch权威指南 深入理解Elasticsearch

分布式搜索引擎ElasticSearch---ElasticSearch进阶使用深入理解搜索技术集群架构原理

计算机科学基础知识Relocatable Object File