Elasticsearchelasticsearch allocation 分析

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearchelasticsearch allocation 分析相关的知识,希望对你有一定的参考价值。

在这里插入图片描述

1.概述

转载并且补充:elasticsearch allocation 分析

本文主要分析allocation 模块的结构和原理,然后以集群启动过程为例分析 allocation 模块的工作过程

2. 什么是 allocation

分片分配就是把一个分片指派到集群中某个节点的过程. 分配决策由主节点完成,分配决策包含两方面:

  • 哪些分片应该分配给哪些节点
  • 哪个分片作为主分片,哪些作为副本分片

对于新建索引和已有索引, 分片分配过程也不尽相同. 不过不管哪种场景, ElasticSearch都通过两个基础组件完成工作: allocators和deciders. Allocators尝试寻找最优的节点来分配分片, deciders则负责判断并决定是否要进行这次分配.

2.1 新建索引

对于新建索引, allocators负责找出拥有分片数最少的节点列表, 并按分片数量增序排序, 因此分片较少的节点会被优先选择. 所以对于新建索引, allocators的目标就是以更为均衡的方式为把新索引的分片分配到集群的节点中. 然后deciders依次遍历allocators给出的节点, 并判断是否把分片分配到该节点.

例如, 如果分配过滤规则中禁止节点A持有索引idx中的任一分片, 那么过滤器也阻止把索引idx分配到节点A中, 即便A节点是allocators从集群负载均衡角度选出的最优节点. 需要注意的是allocators只关心每个节点上的分片数, 而不管每个分片的具体大小. 这恰好是deciders工作的一部分, 即阻止把分片分配到将超出节点磁盘容量阈值的节点上.

2.2 已有索引

对于已有索引, 则要区分主分片还是副本分片.

  • 对于主分片, allocators只允许把主分片指定在已经拥有该分片完整数据的节点上.
  • 而对于副本分片, allocators则是先判断其他节点上是否已有该分片的数据的拷贝(即便数据不是最新的). 如果有这样的节点, allocators就优先把把分片分配到这其中一个节点. 因为副本分片一旦分配, 就需要从主分片中进行数据同步, 所以当一个节点只拥分片中的部分时, 也就意思着那些未拥有的数据必须从主节点中复制得到. 这样可以明显的提高副本分片的数据恢复过程.

2.3 疑问

如果新加入了一台机器,然后上面2种都会因为这个新加的机器 内存大,负载小,磁盘存储大,然后都往这个里面写入,这样可能整个集群的压力都在一台新机器上,然后把机器搞挂,这个是怎么处理的呢?

3.触发时机

index 增删
node 增删
手工 reroute
replica数量改变
集群重启

4.allocation 模块结构概述

这个复杂的分配过程在一个叫 reroute 的函数中实现: AllocationService.reroute ,其函数对外有两种重载,

  • 一种是通过接口调用的手工 reroute,
  • 一种是内部模块调用的 reroute
 private void reroute(RoutingAllocation allocation) {
        assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
        assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), allocation).isEmpty() :
            "auto-expand replicas out of sync with number of nodes in the cluster";
        assert assertInitialized();

        removeDelayMarkers(allocation);

        allocateExistingUnassignedShards(allocation);  // try to allocate existing shard copies first
        shardsAllocator.allocate(allocation);
        assert RoutingNodes.assertShardStats(allocation.routingNodes());
    }

4.1 allocators

es 中有以下几个类型的 allocator:

在这里插入图片描述
es 7.8 相关如下
在这里插入图片描述
平衡的类

在这里插入图片描述

Allocator负责为某个特定的shard 分配目的 node。每个Allocator的主要工作是根据某种逻辑得到一个节点列表,然后调用 deciders 去决策,根据决策结果选择一个目的 node。一句话概述每个 Allocator的作用:

  • primaryShardAllocator:找到那些拥有某 shard 最新数据的 node
  • replicaShardAllocator:找到磁盘上拥有这个 shard 数据的 node
  • BalancedShardsAllocator:找到拥有最少 shard 个数的 node

对于这两类 allocator,我的理解是 gatewayAllocator 是为了找到现有 shard,shardsAllocator 是为了分配全新 shard。

4.2 Deciders

es5中有下列类型的决策器:

下面这个集合在测试类中:org.elasticsearch.cluster.ClusterModuleTests#testAllocationDeciderOrder

List<Class<? extends AllocationDecider>> expectedDeciders = Arrays.asList(
            MaxRetryAllocationDecider.class,
            ResizeAllocationDecider.class,
            ReplicaAfterPrimaryActiveAllocationDecider.class,
            RebalanceOnlyWhenActiveAllocationDecider.class,
            ClusterRebalanceAllocationDecider.class,
            ConcurrentRebalanceAllocationDecider.class,
            EnableAllocationDecider.class,
            NodeVersionAllocationDecider.class,
            SnapshotInProgressAllocationDecider.class,
            RestoreInProgressAllocationDecider.class,
            FilterAllocationDecider.class,
            SameShardAllocationDecider.class,
            DiskThresholdDecider.class,
            ThrottlingAllocationDecider.class,
            ShardsLimitAllocationDecider.class,
            AwarenessAllocationDecider.class);

可以看到实现类很多
在这里插入图片描述

他们继承自AllocationDecider,需要实现的接口有:

* canRebalance
* canAllocate  指定 shard 是否可以分配到指定 node
* canRemain
* canForceAllocatePrimary

这些 decider 在org.elasticsearch.cluster.ClusterModule#createAllocationDeciders
中全部添加进去,。

// TODO: this is public so allocation benchmark can access the default deciders...can we do that in another way?
    /** Return a new {@link AllocationDecider} instance with builtin deciders as well as those from plugins. */
    public static Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings,
                                                                         List<ClusterPlugin> clusterPlugins) {
        // collect deciders by class so that we can detect duplicates
        Map<Class, AllocationDecider> deciders = new LinkedHashMap<>();
        addAllocationDecider(deciders, new MaxRetryAllocationDecider());
        addAllocationDecider(deciders, new ResizeAllocationDecider());
        addAllocationDecider(deciders, new ReplicaAfterPrimaryActiveAllocationDecider());
        addAllocationDecider(deciders, new RebalanceOnlyWhenActiveAllocationDecider());
        addAllocationDecider(deciders, new ClusterRebalanceAllocationDecider(settings, clusterSettings));
        addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings));
        addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings));
        addAllocationDecider(deciders, new NodeVersionAllocationDecider());
        addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
        addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
        addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
        addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
        addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
        addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
        addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings));
        addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));

        clusterPlugins.stream()
            .flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream())
            .forEach(d -> addAllocationDecider(deciders, d));

        return deciders.values();
    }

Decider 运行之后可能产生的结果:

ALWAYS、YES、NO、THROTTLE

4.3 类别

这些 decider 大致可以分为以下几类

4.3.1 负载均衡类

SameShardAllocationDecider
避免主副分片分配到同一个节点

AwarenessAllocationDecider
感知分配器,感知服务器、机架等,尽量分散存储shard

有两种参数用于调整:

cluster.routing.allocation.awareness.attributes: rack_id
cluster.routing.allocation.awareness.attributes: zone

ShardsLimitAllocationDecider
同一个节点上允许存在的同一个index的shard数目

4.3.2 并发控制类

ThrottlingAllocationDecider
recovery阶段的限速配置,包括:

cluster.routing.allocation.node_concurrent_recoveries
cluster.routing.allocation.node_initial_primaries_recoveries
cluster.routing.allocation.node_concurrent_incoming_recoveries
cluster.routing.allocation.node_concurrent_outgoing_recoveries

ConcurrentRebalanceAllocationDecider
rebalance并发控制

通过cluster.routing.allocation.cluster_concurrent_rebalance参数控制,可以动态生效

DiskThresholdDecider
根据磁盘空间进行决策的分配器

4.3.3 条件限制类

RebalanceOnlyWhenActiveAllocationDecider
所有shard都处在active状态下,才可以执行rebalance操作

FilterAllocationDecider
通过接口动态设置的过滤器,包括:

index.routing.allocation.require.【必须】
index.routing.allocation.include. 【允许】
index.routing.allocation.exclude.* 【排除】
cluster.routing.allocation.require.*
cluster.routing.allocation.include.*
cluster.routing.allocation.exclude.*

配置的目标为节点 ip 或节点名等。cluster级别设置会覆盖 index 级别设置。

ReplicaAfterPrimaryActiveAllocationDecider

保证只会在主分片分配完毕后才开始分配分片副本。

ClusterRebalanceAllocationDecider

通过集群中active的shard状态来决定是否可以执行rebalance,通过配置cluster.routing.allocation.allow_rebalance控制,可以动态生效:

indices_all_active

当集群所有的节点分配完毕,才认定集群rebalance完成 默认

indices_primaries_active

只要所有主分片分配完毕,就可以认定集群rebalance 完成

always

即使当主分片和分片副本都没有分配,也允许rebalance操作

4.4 内部模块的 reroute

这里以集群启动时 gateway 之后的 reroute 为背景.

reroute中主要实现是两种 Allocator: gateway 和真正数据的

gatewayAllocator.allocateUnassigned(allocation);
shardsAllocator.allocate(allocation);

集群启动时 reroute 的触发时机
第一次触发在gateway 元信息选举完毕后submitStateUpdateTask提交的任务中执行:

allocationService.reroute

然后收集各个节点的 shard 元数据,待某个shard 的 Response 从所有节点 全部返回 后,执行finishHim(),然后对收集到的数据进行处理:

o.e.g.AsyncShardFetch#processAsyncFetch

进而执行:

routingService.reroute("async_shard_fetch");

这个任务被 clusterService 放入队列,等待 gateway 流程结束才执行,当这个 reroute 才会开始执行

5.流程分析

Allocator分为两种:gateway 和真正数据的.其中gateway 又分主分片和副本的Allocator.

gateway的allocate结束之后,才是数据的allocate,主分片allocate结束后才是副本的allocate

5.1 gatewayAllocator

分为主分片和副本的

primaryShardAllocator.allocateUnassigned(allocation); 
replicaShardAllocator.processExistingRecoveries(allocation);
replicaShardAllocator.allocateUnassigned(allocation);

6.主分片分配器

primaryShardAllocator.allocateUnassigned
主分片分配器与副分片分配器都继承自 BaseGatewayShardAllocator,执行相同的allocateUnassigned过程,只是 makeAllocationDecision时,主副分片分配器各自执行自己的策略。

allocateUnassigned的流程是:遍历所有unassigned shard依次处理,通过 decider 决策分配,期间可能需要 fetchData 获取这个 shard 对应的元数据。如果决策结果为 YES,就将其 init

主副分片执行相同的 unassignedIterator.initialize:
创建一个ShardRouting,如果是副分片,会设置recoverySource为 PEER。然后将其添加到集群状态,设置状态已更新

更新的内容大约就是某个 shard 被分配到了某个节点,这个 shard 是主还是副,副的话会设置recoverySource为 PEER,但只是一个类型,并没有告诉节点 recovery的时候从哪个节点去恢复,节点恢复时自己从集群状态中的路由表中查找。

然后, master 把新的集群状态广播下去,当数据节点发现某个分片分配给自己,开始执行分片的 recovery。

public void allocateUnassigned(RoutingAllocation allocation) {
   while (unassignedIterator.hasNext()) {
       final ShardRouting shard = unassignedIterator.next();
       final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shard, allocation, logger);

       if (allocateUnassignedDecision.isDecisionTaken() == false) {
           // no decision was taken by this allocator
           continue;
       }

       if (allocateUnassignedDecision.getAllocationDecision() == AllocationDecision.YES) {
           unassignedIterator.initialize(allocateUnassignedDecision.getTargetNode().getId(),
               allocateUnassignedDecision.getAllocationId(),
               shard.primary() ? ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE :
                                 allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
               allocation.changes());
       } else {
           unassignedIterator.removeAndIgnore(allocateUnassignedDecision.getAllocationStatus(), allocation.changes());
       }
   }
}

5.2 PrimaryShardAllocator.makeAllocationDecision

这个过程直接返回指定的分片是否可以被分配,如果还没有这个分片的信息,就向集群的其他节点去要,如果已经有了,就根据 decider 进行决策。

首次进入时,还没有任何分片的元信息,发起向集群所有数据节点获取某个 shard 元信息的 fetchData请求

之所以把请求发到所有节点,因为他不知道哪个节点有这个 shard 的数据。集群启动的时候,遍历所有 shard,再对每个 shard 向所有数据节点发 fetchData 虽然整体请求量比较多,但因为这个过程是异步的,执行时间一般还可以接受,主要的时间还是浪费在index的 recovery拖数据的过程,不过个人还是觉得有待改善,可以借鉴 HDFS 上报块状态流程。所以当 es 的集群规模变大,shard 数非常多的时候,这个请求的总量就会很大,

1.向各节点发起fetchData请求

在 AllocaService 的 reroute 中判断是否先进行 gateway 的 allocate:

private void reroute(RoutingAllocation allocation) {
   assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See deassociateDeadNodes";

   // now allocate all the unassigned to available nodes
   if (allocation.routingNodes().unassigned().size() > 0) {
       removeDelayMarkers(allocation);
       gatewayAllocator.allocateUnassigned(allocation);
   }

   shardsAllocator.allocate(allocation);
   assert RoutingNodes.assertShardStats(allocation.routingNodes());
}

然后会进入:gateway.PrimaryShardAllocator#allocateUnassigned

遍历所有 Unassignedshard, 通过 fetchData 挨个获取其元数据.

public void allocateUnassigned(RoutingAllocation allocation) {
   final RoutingNodes routingNodes = allocation.routingNodes();
   final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
   while (unassignedIterator.hasNext()) {
       final ShardRouting shard = unassignedIterator.next();
       final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shard, allocation, logger);
    }
    //...
}

makeAllocationDecision函数是一个虚方法,其实现位于PrimaryShardAllocator.makeAllocationDecision

fetchData会向各个节点请求对应 shard 的元数据,拿到结果才返回,或者13秒超时.在 fetchData 的获取某个 shard 的过程中,不知道这个 shard 在哪个 node 上,因此向集群所有数据 node 发送请求获取这个 shard 的信息.

AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetchData(shard, allocation);

在: o.e.g.AsyncShardFetch#asyncFetch ,从 所有数据 node 获取某个特定 shard 的信息

void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) {
    action.list(shardId, nodes, new ActionListener<BaseNodesResponse<T>>() {
        @Override
        public void onResponse(BaseNodesResponse<T> response) {
        processAsyncFetch(response.getNodes(), response.failures(), fetchingRound);
  }

请求节点 shard 元信息的 action 为:

internal:gateway/local/started_shards

遍历发送的过程主要实现位于:

o.e.action.s.n.TransportNodesAction.AsyncAction#start

2.数据节点的响应模块

对端对此响应的模块为:

gateway.TransportNodesListGatewayStartedShards#nodeOperation

其中读取本地 shard 元数据,返回请求方.

3.收集返回结果并处理

对于一个特定shard,当response达到期望数量(发出请求时的节点数)时执行finishHim,调用到处理模块:

gateway.AsyncShardFetch#processAsyncFetch

在这里实现收到各节点返回的 shard 级别元数据的对应处理,将 response 信息放到this.cache,下次 reroute,的时候从 cache 里取.然后 再次执行 reroute

reroute(shardId, "post_response");
接着调用到-->
routingService.reroute("async_shard_fetch");

主要实现是对当前ClusterState 提交一个任务,再次执行 allocationService.reroute,当前ClusterState可能是一个,也可能是多个

 protected void performReroute(final String reason) {
    clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")", new ClusterStateUpdateTask(Priority.HIGH) {
        @Override
        public ClusterState execute(ClusterState currentState) {
            rerouting.set(false);
            RoutingAllocation.Result routingResult = allocationService.reroute(currentState, reason);
            return ClusterState.builder(currentState).routingResult(routingResult).build();
        }
    });
} 

shard 元数据有了,再次进入主分片分配过程

4.主分片选举实现

如果读者还没有 allocation id 的概念,先看一下这篇翻译的文章,然后就会了解到主分片决策的原理,es5和 es2中的机制是不一样的。

reroute 再次回到 makeAllocationDecision 函数,从inSyncAllocationIds集合找到活跃的 shard,得到一个列表,把这个列表中的节点依次执行 decider,决策结果:

/**
* Split the list of node shard states into groups yes/no/throttle based on allocation deciders
*/
private NodesToAllocate buildNodesToAllocate(RoutingAllocation allocation,
                                            List<NodeGatewayStartedShards> nodeShardStates,
                                            ShardRouting shardRouting,
                                            boolean forceAllocate) {
   List<DecidedNode> yesNodeShards = new ArrayList<>();
   List<DecidedNode> throttledNodeShards = new ArrayList<>();
   List<DecidedNode> noNodeShards = new ArrayList<>();
   for (NodeGatewayStartedShards nodeShardState : nodeShardStates) {
       RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId());
       if (node == null) {
           continue;
       }

       Decision decision = forceAllocate ? allocation.deciders().canForceAllocatePrimary(shardRouting, node, allocation) :
                                           allocation.deciders().canAllocate(shardRouting, node, allocation);
       DecidedNode decidedNode = new DecidedNode(nodeShardState, decision);
       if (decision.type() == Type.THROTTLE) {
           throttledNodeShards.add(decidedNode);
       } else if (decision.type() == Type.NO) {
           noNodeShards.add(decidedNode);
       } else {
           yesNodeShards.add(decidedNode);
       }
   }
   return new NodesToAllocate(Collections.unmodifiableList(yesNodeShards), Collections.unmodifiableList(throttledNodeShards),
                                 Collections.unmodifiableList(noNodeShards));
}

具体的决策过程会依次遍历执行全部14个 decider:

public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
   if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) {
       return Decision.NO;
   }
   Decision.Multi ret = new Decision.Multi();
   for (AllocationDecider allocationDecider : allocations) {
       Decision decision = allocationDecider.canAllocate(shardRouting, node, allocation);
       // short track if a NO is returned.
       if (decision == Decision.NO) {
           

(c)2006-2024 SYSTEM All Rights Reserved IT常识