分片和副本 allocation

Posted 0x13

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分片和副本 allocation相关的知识,希望对你有一定的参考价值。

什么是 allocation:

因为 lucene 是单机索引库以及存储引擎,ES在 lucene基础上加入了集群也就是进行了分布式封装,数据通过 shard 分片分散到不同的节点,并且提供了 replica 来备份 shard。而 allocation 就是 ES 管理分片的分配器并决策应该如何分片,不同分片保存在集群中哪个节点上,哪个作为主分片、哪个作为副分片。

触发 allocation 的时机:

(1)索引的增加和删除 (2)集群节点的增加和删除 (3)手动reroute指定分片到某个节点 (4)索引修改了replica (5)集群重启        

allocation模块的结构:

(1)Allocators 分配器,分配器持有多个决策器。作用是查找最优的节点。 (2)Decider 决策器。作用是判断是否进行本次分配。  

Allocators 种类:

首先看分配器的种类,第一种: BalancedShardsAllocator    继承自 ShardsAllocator 接口。用于查找 shard 数最少的节点。 ReplicaShardAllocator    继承自 BaseGatewayShardAllocator 类。用于查找有某个 shard 的节点。 子类是  InternalPrimaryShardAllocator()。是 GatewayAllocator 的内部类并创建。 PrimaryShardAllocator   继承自 BaseGatewayShardAllocator 类。用于查找有某个 shard 最新数据的节点。 子类是 InternalReplicaShardAllocator()。是 GatewayAllocator 的内部类并创建。

BalancedShardsAllocator 初始化流程:

负责判断负载来判断划分的某个 node。 ClusterModule.createShardsAllocator() new BalancedShardsAllocator() new AllocationService()    创建 AllocationService() 对象,这个是提供分片分配的入口服务。

GatewayAllocator 初始化流程:

负责通过已知 shard 查找划分的某个 node。 new Node()    节点对象创建时模块注入之前。 clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class));    通过Guice获取对象实例,并提交给 ClusterModule。 ClusterModule.setExistingShardsAllocators()    会将 GatewayAllocator 进一步提交给 AllocationsService AllocationService.setExistingShardsAllocators()    提交给 AllocationsService

Decider 初始化流程:

会调用 ClusterModule.addAllocationDecider() 初始化所有 Decider,得到 Decider 集合集合后,构造 AllocationDeciders, Collection<AllocationDecider> 保存所有决策器,流程如下:
【log】ClusterModule.createAllocationDeciders() 添加所有 Decider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ResizeAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.ccr.allocation.CcrPrimaryFollowerAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotRepositoryExistsAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotEnableAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.HasFrozenCacheAllocationDecider
【log】ClusterModule.addAllocationDecider() clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.DedicatedFrozenNodeAllocationDecider
【日志】new AllocationDeciders() 传入的AllocationDecider总数:24
可以发现一共注册了24个决策器,不同决策器作用如下:
MaxRetryAllocationDecider
ResizeAllocationDecider
ReplicaAfterPrimaryActiveAllocationDecider    保证只会在主shard分配完毕后再对副shard进行分配。
RebalanceOnlyWhenActiveAllocationDecider    当所有shard都处在active状态时,才能执行 rebalance。
ClusterRebalanceAllocationDecider    通过集群汇总active状态的shard来决定是否可以执行rebalance。
ConcurrentRebalanceAllocationDecider
EnableAllocationDecider
NodeVersionAllocationDecider
SnapshotInProgressAllocationDecider
RestoreInProgressAllocationDecider
NodeShutdownAllocationDecider
FilterAllocationDecider    通过接口,动态设置某个node分配策略时(必须、允许、排除),对这个node进行过滤。
SameShardAllocationDecider    避免主副shard都分配到一个节点。
DiskThresholdDecider        根据磁盘空间决策shard应该分配到某个node。
ThrottlingAllocationDecider       并发控制,在 recovery 限速控制。
ShardsLimitAllocationDecider    同一个节点限值同一个index的shard数量。
AwarenessAllocationDecider
DataTierAllocationDecider
CcrPrimaryFollowerAllocationDecider
SearchableSnapshotAllocationDecider
SearchableSnapshotRepositoryExistsAllocationDecider
SearchableSnapshotEnableAllocationDecider
HasFrozenCacheAllocationDecider
DedicatedFrozenNodeAllocationDecider
上面的调用实际上最终是为了生成  AllocationServic   对象: ClusterModule 构造函数:
public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins,
                     ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService, ThreadContext threadContext,
                     SystemIndices systemIndices) 
    this.clusterPlugins = clusterPlugins;
    this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
    // 创建 AllocationDeciders 里面是所有决策器
    this.allocationDeciders = new AllocationDeciders(deciderList);
    // 创建 BalancedShardsAllocator
    this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
    this.clusterService = clusterService;
    this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext, systemIndices);
    // 创建 AllocationService
    // 提交 AllocationDeciders、BalancedShardsAllocator
    // 在初始化 Node() 对象时会给 AllocationService 提交 GatewayAllocator 实例
    // 最终 AllocationService 引用了 AllocationDeciders、BalancedShardsAllocator、GatewayAllocator
    this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
    this.metadataDeleteIndexService = new MetadataDeleteIndexService(settings, clusterService, allocationService);

AllocationServic   分配服务:

前面创建的两种决策器、分配器等等所有对象最终都会提交给 AllocationServic  这个服务。都保存在几个属性中。创建 AllocationServic.showAllocatorsInfo() 函数可以获取分配模块的详细信息,在 Node.new() 构造函数最后调用它: AllocationDeciders allocationDeciders;    保存所有决策器 Map<String, ExistingShardsAllocator> existingShardsAllocators;    保存 GatewayAllocator 分配器 ShardsAllocator shardsAllocator;    保存 ShardsAllocator 分配器
【log】AllocationService.showAllocatorsInfo() 输出 allocators 信息
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ResizeAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.ccr.allocation.CcrPrimaryFollowerAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotRepositoryExistsAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotEnableAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.HasFrozenCacheAllocationDecider
【log】AllocationService.allocationDeciders clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.decider.DedicatedFrozenNodeAllocationDecider
【log】AllocationService.shardsAllocator clazz=class org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator
【log】AllocationService.existingShardsAllocators key=gateway_allocator,clazz=class org.elasticsearch.gateway.GatewayAllocator
【log】AllocationService.existingShardsAllocators key=searchable_snapshot_allocator,clazz=class org.elasticsearch.xpack.searchablesnapshots.allocation.SearchableSnapshotAllocator
入口函数 AllocationService.reroute() 代码如下: 会触发 reroute 流程,获取到 reroute 结果,如果涉及到集群状态变更则会由上层调用的 MasterService 将状态广播出去。注意触发条件是集群提交任务,所以这个只在 master 节点执行。 当其他节点收到 reroute 结果,也就是一个node收到新集群状态发现自己被分配了某个 shard 时,这个节点会自动进入shard 的 recovery 流程。
public CommandsResult reroute(final ClusterState clusterState, AllocationCommands commands, boolean explain, boolean retryFailed) 
    RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
    RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,clusterInfoService.getClusterInfo(), snapshotsInfoService.snapshotShardSizes(), currentNanoTime());
    allocation.debugDecision(true);
    allocation.ignoreDisable(true);
    if (retryFailed) 
        resetFailedAllocationCounter(allocation);
    
    RoutingExplanations explanations = commands.execute(allocation, explain);
    // we revert the ignore disable flag, since when rerouting, we want the original setting to take place
    allocation.ignoreDisable(false);
    // 执行 reroute
    reroute(allocation);
    // 返回新的集群状态到 TransportClusterRerouteAction.execute() 中
    return new CommandsResult(explanations, buildResultAndLogHealthChange(clusterState, allocation, "reroute commands"));
会进入重载的reroute函数调用 GatewayAllocator、BalancedShardsAllocator 进行处理: //核心逻辑是根据上述规则&分片权重(index、cluster)进行位置判断,然后进行数据移动、移动结束初始化启动、最后调整clusterstate完成分配。
private void reroute(RoutingAllocation allocation) 
    // 参数时 RoutingAllocation
    // 该类持有当前集群shard分配的状态信息、决策信息、节点信息等,在后面的分配过程的主要操作类。
    assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
    assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), allocation).isEmpty() :
        "auto-expand replicas out of sync with number of nodes in the cluster";
    assert assertInitialized();
    removeDelayMarkers(allocation);
    // 先调用 GatewayAllocator 查询现有未分配的shard
    allocateExistingUnassignedShards(allocation);  
    // 再调用 BalancedShardsAllocator 根据节点负载找一个node进行分配
    shardsAllocator.allocate(allocation);
    assert RoutingNodes.assertShardStats(allocation.routingNodes());
其中传入的参数 allocation ,该类持有当前集群shard分配的状态信息、决策信息、节点信息等,在后面的分配过程的主要操作类。 RoutingAllocation allocation:创建如下: RoutingAllocation allocation = new RoutingAllocation(     allocationDeciders,    // 取 AllocationService 的裁决器     routingNodes,        //      fixedClusterState,     clusterInfoService.getClusterInfo(),    // 集群信息,里面有所有index、shard 信息     snapshotsInfoService.snapshotShardSizes(),     currentNanoTime() );

GatewayAllocator 的处理过程:

AllocationService.reroute()    -> AllocationService.allocateExistingUnassignedShards()    ->
private void allocateExistingUnassignedShards(RoutingAllocation allocation) 
    // 将集群信息中的所有未分配 shard 优先级排序
    allocation.routingNodes().unassigned().sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering
    for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) 
        existingShardsAllocator.beforeAllocation(allocation);
    
    final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
    // 处理未分配的主shard
    while (primaryIterator.hasNext()) 
        final ShardRouting shardRouting = primaryIterator.next();
        if (shardRouting.primary()) 
            // 从集合中查找 GatewayAllocator 并调用 allocateUnassigned()
            ExistingShardsAllocator allocator = getAllocatorForShard(shardRouting, allocation);
            allocator.allocateUnassigned(shardRouting, allocation, primaryIterator);
        
    
    for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) 
        existingShardsAllocator.afterPrimariesBeforeReplicas(allocation);
    
    // 处理未分配的副shard
    final RoutingNodes.UnassignedShards.UnassignedIterator replicaIterator = allocation.routingNodes().unassigned().iterator();
    while (replicaIterator.hasNext()) 
        final ShardRouting shardRouting = replicaIterator.next();
        if (shardRouting.primary() == false) 
            ExistingShardsAllocator allocator = getAllocatorForShard(shardRouting, allocation);
            allocator.allocateUnassigned(shardRouting, allocation, replicaIterator);
        
    
紧接着按照主副未分配shard分别调用: GatewayAllocator.allocateUnassigned()    -> GatewayAllocator.innerAllocatedUnassigned()    ->    判断主还是副shard,调用父类的 allocateUnassigned() 函数。 BaseGatewayShardAllocator.allocateUnassigned()    ->    主副shard都进入这里,只是执行 makeAllocationDecision() 时各自子类(PrimaryShardAllocator/ReplicaShardAllocator)有不同的 decision 策略。 ExistingShardsAllocator.UnassignedAllocationHandler.initialize()    ->    判断前面主、副未分配shard经过各自的 决策器返回 YES,则修改 shard 的状态,由 unassigend(未分配) 改为 initalizer(初始化)。否则直接忽略。 BaseGatewayShardAllocator.allocateUnassigned() 代码:
public void allocateUnassigned(ShardRouting shardRouting, RoutingAllocation allocation, ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler) 
    // 主副 未分配shard都会进入这里, makeAllocationDecision 是静态函数,调用子类的实现
    // PrimaryShardAllocator.makeAllocationDecision()
    // ReplicaShardAllocator.makeAllocationDecision()
    final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger);
    if (allocateUnassignedDecision.isDecisionTaken() == false) 
        return;
    
    // 如果前面通过决策器后返回 YES
    if (allocateUnassignedDecision.getAllocationDecision() == AllocationDecision.YES) 
        // 修改 shard 的状态,由 unassigend(未分配) 改为 initalizer(初始化)
        unassignedAllocationHandler.initialize(
            allocateUnassignedDecision.getTargetNode().getId(),
            allocateUnassignedDecision.getAllocationId(),
            getExpectedShardSize(shardRouting, allocation),
            allocation.changes());
    
    // 忽略
    else 
        unassignedAllocationHandler.removeAndIgnore(allocateUnassignedDecision.getAllocationStatus(), allocation.changes());
    
其中,在主副分片判断是否初始化时,主节点会调用 fetch.Data() 向其他节点询问这个 shard 的信息,确定它在哪个 shard。 PrimaryShardAllocator.makeAllocationDecision()    => ReplicaShardAllocator.makeAllocationDecision()    => 发送的RPC请求如下: internal : gateway / local/ started_shards 其他节点会执行下列方向从本地查询shard信息并响应: gateway.TransportNodesListGatewaystartedShards#node0peration  在确定了节点信息之后,会调用系列方法判断shard是否可以进行分配,如果可以则返回 AllocateUnassignedDecision.yes 调用 AllocationDeciders.canAllocate() 判断这个shard是否可以在某个节点上进行分配    => <br> 如果是YES会修改状态并调用初始化: ExistingShardsAllocator.UnassignedAllocationHandler.initialize()     => RoutingNodes.UnassignedIterator.initialize()    =>    返回一个  ShardRouting 对象。 <br> ShardRouting 对象记录着这个 shard 的详细信息:
private final ShardId shardId;  //分片id
private final String currentNodeId; //当前所在节点id
private final String relocatingNodeId;  //重新分配的节点id
private final boolean primary;  //是否主
private final ShardRoutingState state;  //分片状态

BalancedShardsAllocator 的处理过程。

会创建 Balancer ,后续过程由 Balancer 接管。 Balancer.allocateUnassigned()    =>    未分配shard的分配判断,同样会调用 RoutingNodes.initializeShard() 修改初始化状态 BalancedShardsAllocator.decideAllocateUnassigned()    =>    判断是否有一个node可以对这个shard进行分配,根据负载来的。同样返回判断结果 Decision.YES。 BalancedShardsAllocator.allocate() 代码如下:
public void allocate(RoutingAllocation allocation) 
    if (allocation.routingNodes().size() == 0) 
        failAllocationOfNewPrimaries(allocation);
        return;
    
    final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
    // 根据权重算法和decider决定把shard分配到哪个节点。同样将决策后的分配信息更新到集群状态,由Master广播下去。
    balancer.allocateUnassigned();
    // 对状态为started的分片根据decider来判断是否需要“move”,
    balancer.moveShards();
         //  根据权重函数平衡集群模型上的节点。
    balancer.balance();
reroute 的结果最终时如何由master通知到集群出去的? RoutingNodes.initializeShard()
public ShardRouting initializeShard(ShardRouting unassignedShard, String nodeId, @Nullable String existingAllocationId, long expectedSize, RoutingChangesObserver routingChangesObserver) 
    ensureMutable();
    assert unassignedShard.unassigned() : "expected an unassigned shard " + unassignedShard;
    // 初始化 ShardRouting
    ShardRouting initializedShard = unassignedShard.initialize(nodeId, existingAllocationId, expectedSize);
    // 添加到目的节点的节点信息,这个操作在master执行,生成了新的集群转台后续再广播出去
    node(nodeId).add(initializedShard);
    inactiveShardCount++;
    if (initializedShard.primary()) 
        inactivePrimaryCount++;
    
    addRecovery(initializedShard);
    assignedShardsAdd(initializedShard);
    // 通知shard路由改变观察者,
    routingChangesObserver.shardInitialized(unassignedShard, initializedShard);
    return initializedShard;

会调用观察者通知函数:

RoutingChangesObserver.shardInitialized()    =>    观察者通知 RoutingNodesChangedObserver.setChanged()    =>    修改 changed==true 最终会判断 changed==true 的地方: MasterService.runTasks()    ->  最终执行任务 TaskInputs MasterService.calculateTaskOutputs()    -> 调用提交任务时定义的  execute() 函数 MasterService.executeTasks()    -> 调用提交任务时定义的  execute() 函数,并返回一个 ClusterTasksResult 对象,封装了执行成功与否以及执行完毕后集群状态 ClusterState ClusterStateUpdateTask.execute    =>    执行集群任务 AllocationService.reroute()    =>    开始 reroute 流程,判断集群状态中记录的所有index的shard是否有未分配的,并判断是否能进行某个节点分配 RoutingAllocation.routingNodesChanged()    =>    判断路由信息是否改变。 RoutingNodesChangedObserver.isChanged()    =>    判断路由信息是否改变,如果改变了则返回新的集群状态。 MasterService.publish()     ->     判断任务结果是否有集群状态变更消息需要推送出去,调用 clusterStatePublisher 消息推送者发送消息
【集群重启】 集群启动造成的 allocation 过程。
【集群节点加入】 在集群节点加入时会造成 allocation 过程。
【索引新增】 在索引新增时也会造成 allocation 过程。 新增索引请求:
【修改replica】 在修改索引的 replica 的备份数量也会造成 allocation 过程。

以上是关于分片和副本 allocation的主要内容,如果未能解决你的问题,请参考以下文章

ES 调整分片和副本数

clickhouseclickhouse 副本与分片 副本详解

大数据ClickHouse进阶:副本与分片

Elasticsearch 中的分片和副本

5.elasticsearch-分片

2021年大数据Kafka:Kafka的分片和副本机制