Elasticsearch源码 更新性能分析

Posted 衣舞晨风

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch源码 更新性能分析相关的知识,希望对你有一定的参考价值。

带着疑问学源码,第三篇:Elasticsearch 更新性能
代码分析基于:https://github.com/jiankunking/elasticsearch
Elasticsearch 7.10.2+

目的

在看源码之前先梳理一下,自己对于更新疑惑的点:
为什么Elasticsearch更新与写入的性能会有比较大的差异?

源码分析

建议先看一下:【Elasticsearch源码】 写入分析

【Elasticsearch源码】 写入分析中可以看到bulk请求最终在TransportShardBulkAction doRun()中执行的时候,还是通过一个循环,一个一个处理的,并没有什么神奇之处。

下面看一下具体执行的代码executeBulkItemRequest doRun()

     /**
     * Executes bulk item requests and handles request execution exceptions.
     * @return @code true if request completed on this thread and the listener was invoked, @code false if the request triggered
     *                      a mapping update that will finish and invoke the listener on a different thread
     */
    static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier,
                                       MappingUpdatePerformer mappingUpdater, Consumer<ActionListener<Void>> waitForMappingUpdate,
                                       ActionListener<Void> itemDoneListener) throws Exception 
        final DocWriteRequest.OpType opType = context.getCurrent().opType();

        final UpdateHelper.Result updateResult;
        if (opType == DocWriteRequest.OpType.UPDATE) 
            final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent();
            try 
                // 
                updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier);
             catch (Exception failure) 
                // we may fail translating a update to index or delete operation
                // we use index result to communicate failure while translating update request
                final Engine.Result result =
                    new Engine.IndexResult(failure, updateRequest.version());
                context.setRequestToExecute(updateRequest);
                context.markOperationAsExecuted(result);
                context.markAsCompleted(context.getExecutionResult());
                return true;
            
            // execute translated update request
            switch (updateResult.getResponseResult()) 
                case CREATED:
                case UPDATED:
                    IndexRequest indexRequest = updateResult.action();
                    IndexMetadata metadata = context.getPrimary().indexSettings().getIndexMetadata();
                    MappingMetadata mappingMd = metadata.mapping();
                    indexRequest.process(metadata.getCreationVersion(), mappingMd, updateRequest.concreteIndex());
                    context.setRequestToExecute(indexRequest);
                    break;
                case DELETED:
                    context.setRequestToExecute(updateResult.action());
                    break;
                case NOOP:
                    context.markOperationAsNoOp(updateResult.action());
                    context.markAsCompleted(context.getExecutionResult());
                    return true;
                default:
                    throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult());
            
         else 
            context.setRequestToExecute(context.getCurrent());
            updateResult = null;
        

        assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state

        final IndexShard primary = context.getPrimary();
        final long version = context.getRequestToExecute().version();
        final boolean isDelete = context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE;
        final Engine.Result result;
        if (isDelete) 
            final DeleteRequest request = context.getRequestToExecute();
            result = primary.applyDeleteOperationOnPrimary(version, request.id(), request.versionType(),
                request.ifSeqNo(), request.ifPrimaryTerm());
         else 
            final IndexRequest request = context.getRequestToExecute();
            result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse(
                    request.index(), request.id(), request.source(), request.getContentType(), request.routing()),
                    request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
        
        if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) 

            try 
                primary.mapperService().merge(MapperService.SINGLE_MAPPING_NAME,
                    new CompressedXContent(result.getRequiredMappingUpdate(), XContentType.JSON, ToXContent.EMPTY_PARAMS),
                    MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT);
             catch (Exception e) 
                logger.info(() -> new ParameterizedMessage(" mapping update rejected by primary", primary.shardId()), e);
                onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
                return true;
            

            mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(),
                new ActionListener<>() 
                    @Override
                    public void onResponse(Void v) 
                        context.markAsRequiringMappingUpdate();
                        waitForMappingUpdate.accept(
                            ActionListener.runAfter(new ActionListener<>() 
                                @Override
                                public void onResponse(Void v) 
                                    assert context.requiresWaitingForMappingUpdate();
                                    context.resetForExecutionForRetry();
                                

                                @Override
                                public void onFailure(Exception e) 
                                    context.failOnMappingUpdate(e);
                                
                            , () -> itemDoneListener.onResponse(null))
                        );
                    

                    @Override
                    public void onFailure(Exception e) 
                        onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
                        // Requesting mapping update failed, so we don't have to wait for a cluster state update
                        assert context.isInitial();
                        itemDoneListener.onResponse(null);
                    
                );
            return false;
         else 
            onComplete(result, context, updateResult);
        
        return true;
    

    /**
     * Prepares an update request by converting it into an index or delete request or an update response (no action).
     */
    public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) 
        // 这里是实时获取
        // 获取结果最终会到InternalEngine 
        // get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper)
        // 后面会附上 代码
        final GetResult getResult = indexShard.getService().getForUpdate(
            request.id(), request.ifSeqNo(), request.ifPrimaryTerm());
        return prepare(indexShard.shardId(), request, getResult, nowInMillis);
    

    public GetResult getForUpdate(String id, long ifSeqNo, long ifPrimaryTerm) 
        // realtime是true
        return get(id, new String[]RoutingFieldMapper.NAME, true,
            Versions.MATCH_ANY, VersionType.INTERNAL, ifSeqNo, ifPrimaryTerm, FetchSourceContext.FETCH_SOURCE);
    

    private GetResult get(String id, String[] gFields, boolean realtime, long version, VersionType versionType,
                          long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) 
        currentMetric.inc();
        try 
            long now = System.nanoTime();
            GetResult getResult =
                innerGet(id, gFields, realtime, version, versionType, ifSeqNo, ifPrimaryTerm, fetchSourceContext);

            if (getResult.isExists()) 
                existsMetric.inc(System.nanoTime() - now);
             else 
                missingMetric.inc(System.nanoTime() - now);
            
            return getResult;
         finally 
            currentMetric.dec();
        
    

    private GetResult innerGet(String id, String[] gFields, boolean realtime, long version, VersionType versionType,
                               long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) 
        fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);

        Engine.GetResult get = indexShard.get(new Engine.Get(realtime, realtime, id)
            .version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
        assert get.isFromTranslog() == false || realtime : "should only read from translog if realtime enabled";
        if (get.exists() == false) 
            get.close();
        

        if (get == null || get.exists() == false) 
            return new GetResult(shardId.getIndexName(), id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);
        

        try 
            // break between having loaded it from translog (so we only have _source), and having a document to load
            return innerGetLoadFromStoredFields(id, gFields, fetchSourceContext, get, mapperService);
         finally 
            get.close();
        
    

    public Engine.GetResult get(Engine.Get get) 
        readAllowed();
        DocumentMapper mapper = mapperService.documentMapper();
        if (mapper == null) 
            return GetResult.NOT_EXISTS;
        
        return getEngine().get(get, mapper, this::wrapSearcher);
    

     /**
     * Prepares an update request by converting it into an index or delete request or an update response (no action, in the event of a
     * noop).
     */
    protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult getResult, LongSupplier nowInMillis) 
        if (getResult.isExists() == false) 
            // If the document didn't exist, execute the update request as an upsert
            return prepareUpsert(shardId, request, getResult, nowInMillis);
         else if (getResult.internalSourceRef() == null) 
            // no source, we can't do anything, throw a failure...
            throw new DocumentSourceMissingException(shardId, request.id());
         else if (request.script() == null && request.doc() != null) 
            // The request has no script, it is a new doc that should be merged with the old document
            return prepareUpdateIndexRequest(shardId, request, getResult, request.detectNoop());
         else 
            // The request has a script (or empty script), execute the script and prepare a new index request
            return prepareUpdateScriptRequest(shardId, request, getResult, nowInMillis);
        
    

其中,prepare在org/elasticsearch/action/update/UpdateHelper.java 中。

从代码中可以看到更新逻辑分两步:

  • 获取待更新文档的数据
  • 执行更新文档的操作

第1步最终会调用InternalEngine中的get方法。代码如下:

    @Override
    public GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper) 
        assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
        try (ReleasableLock ignored = readLock.acquire()) 
            ensureOpen();
            // 是否实时获取
            if (get.realtime()) 
                final VersionValue versionValue;
                try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) 
                    // we need to lock here to access the version map to do this truly in RT
                    versionValue = getVersionFromMap(get.uid().bytes());
                
                if (versionValue != null) 
                    if (versionValue.isDelete()) 
                        return GetResult.NOT_EXISTS;
                    
                    if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) 
                        throw new VersionConflictEngineException(shardId, get.id(),
                            get.versionType().explainConflictForReads(versionValue.version, get.version()));
                    
                    if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
                        get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term
                        )) 
                        throw new VersionConflictEngineException(shardId, get.id(),
                            get.getIfSeqNo(), get.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term);
                    
                    // 是否从Translog获取
                    if (get.isReadFromTranslog()) 
                        // this is only used for updates - API _GET calls will always read form a reader for consistency
                        // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
                        if (versionValue.getLocation() != null) 
                            try 
                                final Translog.Operation operation = translog.readOperation(versionValue.getLocation());
                                if (operation != null) 
                                    return getFromTranslog(get, (Translog.Index) operation, mapper, searcherWrapper);
                                
                             catch (IOException e) 
                                maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
                                throw new EngineException(shardId, "failed to read operation from translog", e);
                            
                         else 
                            trackTranslogLocation.set(true);
                        
                    
                    assert versionValue.seqNo >= 0 : versionValue;
                    refreshIfNeeded("realtime_get", versionValue.seqNo);
                
                return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper));
             else 
                // we expose what has been externally expose in a point in time snapshot via an explicit refresh
                return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper));
            
        
    

总结

update操作需要先获取原始文档,如果查询不到,会新增;如果存在,会根据原始文档更新。

虽然更新操作最终调用的方法也是InternalEngine中的index,但在更新时调用lucene softUpdateDocuments,会包含两个操作:标记删除、新增。

相对于新增而言:

  • 多了一次完整的查询(为了保证一致性,update调用GET时将realtime选项设置为true,并且不可配置。因此update操作可能会导致refresh生成新的Lucene分段。)
  • 多了一个标记删除

如果数据量比较大,操作又比较频繁的情况下,update这种操作还是要慎重。

以上是关于Elasticsearch源码 更新性能分析的主要内容,如果未能解决你的问题,请参考以下文章

Elasticsearch源码分析-架构设计之Action

Elasticsearch源码分析-索引分析(一)

ElasticSearch 启动时加载 Analyzer 源码分析

Elasticsearch源码 GET分析

ElasticSearch Index操作源码分析

Elasticsearch源码 写入分析