《Elasticsearch 源码解析与优化实战》第10章:索引恢复流程分析

Posted 宝哥大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《Elasticsearch 源码解析与优化实战》第10章:索引恢复流程分析相关的知识,希望对你有一定的参考价值。

一、简介

索引恢复(index.recovery)是ES数据恢复过程。待恢复的数据是客户端写入成功,但未执行刷盘(flush)的Lucene分段。 例如,当节点异常重启时,写入磁盘的数据先到文件系统的缓冲,未必来得及刷盘,如果不通过某种方式将未刷盘的数据找回来,则会丢失一些数据,这是保持数据完整性的体现;另一方面,由于写入操作在多个分片副本上没有来得及全部执行,副分片需要同步成和主分片完全一致,这是数据副本一致性的体现。

根据数据分片性质,索引恢复过程可分为主分片恢复流程和副分片恢复流程。

  • 主分片从translog中自我恢复,尚未执行flush到磁盘的Lucene分段可以从translog中重建;
  • 副本分片需要从主分片中拉取Lucene分段和translog进行恢复。但是有机会跳过拉取Lucene分段的过程;

索引恢复的触发条件包括:快照备份恢复、节点加入和离开、索引的_open操作等。恢复工作一般经历以下阶段(stage),如下表所示:

阶段简介
INIT (init)恢复尚未启动
INDEX (index)恢复Lucene文件,以及在节点间负责索引数据
VERIFY_INDEX (verify_index)验证索引
TRANSLOG (translog)启动engine,重放translog,建立Lucene索引
FINALIZE (finalize)清理工作
DONE (done)完毕

主分片和副本分片恢复都会经历这些阶段,但有时候会跳过具体执行过程,只是在流程上体现出经历了这个短暂阶段。 例如副本分片恢复时会跳过TRANSLOG重放过程;主分片恢复过程中的INDEX阶段不会在节点直接复制数据。

1.2、相关配置

配置简介
indices.recovery.max_bytes_per_sec副本分片恢复的phase1过程中,主副分片节点之间传输数据的速度限制,默认为40MB/s,单位为字节。设置为0则不限速。
indices.recovery.retry_delay_state_sync由于集群状态同步导致recovery失败时,重试recovery前的等待时间,默认500ms。
indices.recovery.retry_delay_network由于网络问题导致recovery失败时,重试recovery前的等待时间,默认5s。
indices.recovery.internal_action_timeout用于某些恢复请求的RPC超时时间,默认为15 min,例如:perpare_translog、clean_files等。
indices.recovery.internal_action_long_timeout与上面的用处相同,但是超时时间更长,默认为前者的2倍。
indices.recovery.recovery_activity_timeout不活跃的recovery超时时间,默认值等于indices.recovery.internal_action_long_timeout

二、流程概述

recovery 由 clusterChanged 触发,从触发到开始执行恢复的调用关系如下:

IndicesClusterStateService#applyClusterState
    -> createOrUpdateShards()
    -> createShard()
    -> IndicesService.createShard()
    -> IndexShard.startRecovery()

IndexShard#startRecovery执行对一个特定分片的恢复流程,根据此分片的恢复类型执行相应的恢复过程:

public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
                              PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
                              BiConsumer<String, MappingMetaData> mappingUpdateConsumer,
                              IndicesService indicesService) {
   
    assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());
    switch (recoveryState.getRecoverySource().getType()) {
        case EMPTY_STORE:
        case EXISTING_STORE:
            markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
            threadPool.generic().execute(() -> {
                try {
                    //主分片从本地恢复
                    if (recoverFromStore()) {
                        recoveryListener.onRecoveryDone(recoveryState);
                    }
                } catch (Exception e) {
                    recoveryListener.onRecoveryFailure(recoveryState,
                            new RecoveryFailedException(recoveryState, null, e), true);
                }
            });
            break;
        case PEER:
            try {
                markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
                //副分片从远程主分片恢复
                recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
            } catch (Exception e) {
                failShard("corrupted preexisting index", e);
                recoveryListener.onRecoveryFailure(recoveryState,
                        new RecoveryFailedException(recoveryState, null, e), true);
            }
            break;
        case SNAPSHOT:
            markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
            SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource();
            threadPool.generic().execute(() -> {
                try {
                    final Repository repository = repositoriesService.repository(recoverySource.snapshot().getRepository());
                    //从快照恢复
                    if (restoreFromRepository(repository)) {
                        recoveryListener.onRecoveryDone(recoveryState);
                    }
                } catch (Exception e) {
                    recoveryListener.onRecoveryFailure(recoveryState,
                            new RecoveryFailedException(recoveryState, null, e), true);
                }
            });
            break;
        case LOCAL_SHARDS:
            final IndexMetaData indexMetaData = indexSettings().getIndexMetaData();
            final Index resizeSourceIndex = indexMetaData.getResizeSourceIndex();
            final List<IndexShard> startedShards = new ArrayList<>();
            final IndexService sourceIndexService = indicesService.indexService(resizeSourceIndex);
            final Set<ShardId> requiredShards;
            final int numShards;
            if (sourceIndexService != null) {
                requiredShards = IndexMetaData.selectRecoverFromShards(shardId().id(),
                        sourceIndexService.getMetaData(), indexMetaData.getNumberOfShards());
                for (IndexShard shard : sourceIndexService) {
                    if (shard.state() == IndexShardState.STARTED && requiredShards.contains(shard.shardId())) {
                        startedShards.add(shard);
                    }
                }
                numShards = requiredShards.size();
            } else {
                numShards = -1;
                requiredShards = Collections.emptySet();
            }
           
            if (numShards == startedShards.size()) {
                assert requiredShards.isEmpty() == false;
                markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread
                threadPool.generic().execute(() -> {
                    try {
                        //从本节点其他分片恢复(shrink时)
                        if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream()
                                .filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()))) {
                            recoveryListener.onRecoveryDone(recoveryState);
                        }
                    } catch (Exception e) {
                        recoveryListener.onRecoveryFailure(recoveryState,
                                new RecoveryFailedException(recoveryState, null, e), true);
                    }
                });
            } else {
                final RuntimeException e;
                if (numShards == -1) {
                    e = new IndexNotFoundException(resizeSourceIndex);
                } else {
                    e = new IllegalStateException("not all required shards of index " + resizeSourceIndex
                            + " are started yet, expected " + numShards + " found " + startedShards.size() + " can't recover shard "
                            + shardId());
                }
                throw e;
            }
            break;
        default:
            throw new IllegalArgumentException("Unknown recovery source " + recoveryState.getRecoverySource());
    }
}

此时线程池为ClusterApplierService#updateTask。执行具体的恢复工作时,会到另一个线程池中执行。无论哪种恢复类型,都在generic线程池中。

本章我们主要介绍主分片和副分片的恢复流程。Snapshotshrink属于比较独立的功能,在后续的章节中单独分析。

三、主分片恢复流程

3.1、INIT阶段

一个分片的恢复流程中,从开始执行恢复的那一刻起,被标记为INIT阶段,INIT阶段在IndexShard#startRecovery函数的参数中传入,在判断此分片属于恢复类型之前就被设置为INIT阶段。

markAsRecovering("from store", recpveryState);

public IndexShardState markAsRecovering(String reason, RecoveryState recoveryState) throws IndexShardStartedException,
        IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException {
    synchronized (mutex) {
        if (state == IndexShardState.CLOSED) {
            throw new IndexShardClosedException(shardId);
        }
        if (state == IndexShardState.STARTED) {
            throw new IndexShardStartedException(shardId);
        }
        if (state == IndexShardState.RECOVERING) {
            throw new IndexShardRecoveringException(shardId);
        }
        if (state == IndexShardState.POST_RECOVERY) {
            throw new IndexShardRecoveringException(shardId);
        }
        this.recoveryState = recoveryState;
        return changeState(IndexShardState.RECOVERING, reason);
    }
}

然后在新的线程池中执行主分片恢复流程:

threadPool.generic().execute(() -> {    
    try {
        //主分片从本地恢复
        if (recoverFromStore()) {
            //恢复成功
            //向Master发送action为 internal:cluster/shard/started的RPC请求
            recoveryListener.onRecoveryDone(recoveryState);
        }
    } catch (Exception e) {
        //恢复失败
        //关闭Engin,向Master发送 internal:cluster/shard/failure的RPC请求
        recoveryListener.onRecoveryFailure(recoveryState,
                            new RecoveryFailedException(recoveryState, null, e), true);
    }
});

接下来,恢复流程在新的线程池中开始执行,开始阶段主要是一下验证工作,例如校验当前分片是否为主分片,分片状态是否异常等。

做完简单的校验工作后,进入INDEX阶段:

public void prepareForIndexRecovery() {
   
    if (state != IndexShardState.RECOVERING) {
        throw new IndexShardNotRecoveringException(shardId, state);
    }
    recoveryState.setStage(RecoveryState.Stage.INDEX);
    assert currentEngineReference.get() == null;
}

3.2、INDEX阶段

本阶段从 Lucene 读取最后一次提交的分段信息,StoreRecovery#internalRecoverFromStore获取其中的版本号,更新当前索引版本:

private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRecoveryException {
    final RecoveryState recoveryState = indexShard.recoveryState();
    final boolean indexShouldExists = recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE;
    //进入INDEX阶段
    indexShard.prepareForIndexRecovery();
    SegmentInfos si = null;
    final Store store = indexShard.store();
    store.incRef();
    try {
        try {
            store.failIfCorrupted();
            try {
                //获取Lucene最后一次提交的分段信息,得到其中的版本号
                si = store.readLastCommittedSegmentsInfo();
            } catch (Exception e) {
                String files = "_unknown_";
                try {
                    files = Arrays.toString(store.directory().listAll());
                } catch (Exception inner) {
                    inner.addSuppressed(e);
                    files += " (failure=" + ExceptionsHelper.detailedMessage(inner) + ")";
                }
                if (indexShouldExists) {
                    throw new IndexShardRecoveryException(shardId,
                            "shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files, e);
                }
            }
            if (si != null && indexShouldExists == false) {
                // it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
                // its a "new index create" API, we have to do something, so better to clean it than use same data
                logger.trace("cleaning existing shard, shouldn't exists");
                Lucene.cleanLuceneIndex(store.directory());
                si = null;
            }
        } catch (Exception e) {
            throw new IndexShardRecoveryException(shardId, "failed to fetch index version after copying it over", e);
        }
        if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
            assert indexShouldExists;
            bootstrap(indexShard, store);
            writeEmptyRetentionLeasesFile(indexShard);
        } else if (indexShouldExists) {
            if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
                store.bootstrapNewHistory();
                writeEmptyRetentionLeasesFile(indexShard);
            }
            // since we recover from local, just fill the files and size
            try {
                final RecoveryState.Index index = recoveryState.getIndex();
                if (si != null) {
                    addRecoveredFileDetails(si, store, index);
                }
            } catch (IOException e) {
                logger.debug("failed to list file details", e);
            }
        } else {
              store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
            final String translogUUID = Translog.createEmptyTranslog(
                    indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, shardId,
                    indexShard.getPendingPrimaryTerm());
            store.associateIndexWithNewTranslog(translogUUID);
                writeEmptyRetentionLeasesFile(indexShard);
        }
        //从tanslog恢复数据
        indexShard.openEngineAndRecoverFromTranslog();
        indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
        //进入FINALIZE状态
        indexShard.finalizeRecovery();
        //进入DONE阶段
        indexShard.postRecovery("post recovery from shard_store");
    } catch (EngineException | IOException e) {
        throw new IndexShardRecoveryException(shardId, "failed to recover from gateway", e);
    } finally {
        store.decRef();
    }
}

3.3、VERIFY_INDEX阶段

VERIFY_INDEX 中的INDEX指Lucene index,因此本阶段的作用是验证当前分片是否损坏,是否进行本项检查取决于配置项:

index.shard.check_on_startup=true

该配置的取值如下表所示:

阶段简介
false默认值,打开分片时不检查分片是否损坏
checksum检查物理损坏
true检查物理和逻辑损坏,这将消耗大量的内存和CPU资源
fix检查物理和逻辑损坏。损坏的分段

以上是关于《Elasticsearch 源码解析与优化实战》第10章:索引恢复流程分析的主要内容,如果未能解决你的问题,请参考以下文章

《Elasticsearch 源码解析与优化实战》第19章:搜索速度优化

《Elasticsearch 源码解析与优化实战》第19章:搜索速度优化

《Elasticsearch 源码解析与优化实战》第18章:写入速度优化

《Elasticsearch 源码解析与优化实战》第18章:写入速度优化

《Elasticsearch 源码解析与优化实战》样章-第 6 章 数据模型

《Elasticsearch 源码解析与优化实战》第9章:Search流程

(c)2006-2024 SYSTEM All Rights Reserved IT常识