Elasticsearch 5.x 源码分析(14)你一定需要使用nested 类型吗?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch 5.x 源码分析(14)你一定需要使用nested 类型吗?相关的知识,希望对你有一定的参考价值。

参考技术A 很早之前就听说 nested 字段的查询效率要慢一个数量级, parent-children 查询要慢2个数量级,一直是将信将疑的,知道最近的一些慢查询的排查终于踩到这坑上来,因此这期想来聊聊nested的那些事儿。

有时候,我们的索引会有一些子字段,类似于SQL的JOIN,一般我们会用一个 nested 字段来建 mapping ,比如我厂下面的这两个例子:

那下面是一个可能的查询场景:

熟悉ES的人肯定知道,如果我必须要对 nested 类型进行联合查询,那没辙,肯定是必须采用 nested 类型,但是这里我要highlight 一点,这种场景的查询往往是很小结果集的,也就是说不管从parent的筛选,还是到nested的筛选,这个最后的结果集往往都是很小的,如果是这样的话,那么这个查询其实没什么毛病,它非常快;这也是我一直的认识,我一直以为,其实nested查询和普通查询性能其实没有说的那么糟糕。
而话又说回来,我们真的是所有查询都是需要用到nested属性吗,或者说,如果我是下面这种场景的查询呢?

那么我们去看看nested的实现。

这段是Nested的聚合的核心算法,我们用3部分来看这个算法:

这段和上面的aggregation非常类似,最后会生成一个 ESToParentBlockJoinQuery ,在里面会再构造出一个 ToParentBlockJoinQuery

这个类虽然长,但是并不难理解,所以我不想在这里把它全贴出来,在这个类里面最引人注目的可能就是子类 TwoPhaseIterator ,这个类主要被 BlockJoinScorer 用到,顾名思义是用于两阶段的调表,其实本质上来说也就是两个联表间的联动用于找出 match 的文档。
这个当然不是重点,这里我要说的反而是下面这段

结合这两个nested的查询,我得出下面的断言:

那么既然这个开销主要还是在减少这个 ParentBitSet 生成的开销,因此优化的思路可以是尽可能的不要用 nested 类型,如果一定要用来保存作为某些联动的查询,那么对于没有联动需求的查询,可以通过 copy_to 的方式拷贝一份到parent专门来做这类的查询和聚合(用空间换性能)

下面是我通过对3个 nested 字段做 aggregation 转成对外面的的3个字段做 aggregation ,对比文章开头的优化

文章结尾贴出我最近几天前对另外一个超过20亿数据的大索引做的优化,其中也有一些是把 nested 字段 copy_to 到外面做查询的一些功劳。平均延迟下降明显

希望对你的索引优化提供一些帮助。

本篇完!

Elasticsearch源码 GET分析

带着疑问学源码,第四篇:Elasticsearch GET
代码分析基于:https://github.com/jiankunking/elasticsearch
Elasticsearch 7.10.2+

通过前3篇的学习,可以稍微总结一下Elasticsearch:

  • ES是一个集群,所以每个Node都需要和其他的Nodes 进行交互,这些交互是通过NodeClient来完成。
  • ES中RPC、HTTP请求都是基于Netty自行封装的:
  • Transport*Action 是比较核心的类集合:
    • Action -> Transport*Action
    • TransportAction -> TransportHandler(即使是本地Node也会通过发请求的方式,将处理转发到TransportHandler处理)
    • 真实干活的Transport*Action类(或者其父类)中doExecute(…)

目的

在看源码之前先梳理一下,自己对于GET流程疑惑的点:

  • 是不是根据Document _id通过hash找到对应的Shard?
  • 根据Document _id查询如何做到实时可见的?

源码分析

第二部分是代码分析的过程,不想看的朋友可以跳过直接看第三部分总结。

通过搜索/index/_doc/id可以找到RestGetAction,找到RestGetAction再加上前面的总结,其实就知道真实干活的是TransportGetAction

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IFN0q7Qv-1647420796417)(/images/elasticsearch-get-source-code-analysis/TransportGetAction.png)]

在TransportGetAction的父类TransportSingleShardAction中找到了doExecute:

    @Override
    protected void doExecute(Task task, Request request, ActionListener<Response> listener) 
        new AsyncSingleAction(request, listener).start();
    

    // TransportSingleShardAction的AsyncSingleAction中
    private AsyncSingleAction(Request request, ActionListener<Response> listener) 
            this.listener = listener;

            ClusterState clusterState = clusterService.state();
            if (logger.isTraceEnabled()) 
                logger.trace("executing [] based on cluster state version []", request, clusterState.version());
            

            // 集群nodes列表
            nodes = clusterState.nodes();
            ClusterBlockException blockException = checkGlobalBlock(clusterState);
            if (blockException != null) 
                throw blockException;
            

            String concreteSingleIndex;
            if (resolveIndex(request)) 
                concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();
             else 
                concreteSingleIndex = request.index();
            
            this.internalRequest = new InternalRequest(request, concreteSingleIndex);

            // TransportGetAction中resolveRequest
            // 解析请求,更新指定routing
            resolveRequest(clusterState, internalRequest);

            blockException = checkRequestBlock(clusterState, internalRequest);
            if (blockException != null) 
                throw blockException;
            

            // 根据路由算法获取目标shard的迭代器或者根据优先级获选择目标节点
            this.shardIt = shards(clusterState, internalRequest);
        


    // TransportGetAction中
    @Override
    protected ShardIterator shards(ClusterState state, InternalRequest request) 
        return clusterService.operationRouting()
                .getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(),
                    request.request().preference());
    

下面看一下OperationRouting中的getShards(…)看一下是如何获取到具体的shardId的:

 public ShardIterator getShards(ClusterState clusterState, String index, String id, @Nullable String routing,
                                   @Nullable String preference) 
        return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(),
            clusterState.nodes(), preference, null, null);
    

protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String id, String routing) 
        int shardId = generateShardId(indexMetadata(clusterState, index), id, routing);
        return clusterState.getRoutingTable().shardRoutingTable(index, shardId);
    

public static int generateShardId(IndexMetadata indexMetadata, @Nullable String id, @Nullable String routing) 
        final String effectiveRouting;
        final int partitionOffset;

        // routing参数解析可以参考具体的文档
        // https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html
        if (routing == null) 
            assert(indexMetadata.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
            effectiveRouting = id;
         else 
            effectiveRouting = routing;
        

        if (indexMetadata.isRoutingPartitionedIndex()) 
            partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetadata.getRoutingPartitionSize());
         else 
            // we would have still got 0 above but this check just saves us an unnecessary hash calculation
            partitionOffset = 0;
        

        return calculateScaledShardId(indexMetadata, effectiveRouting, partitionOffset);
    

private static int calculateScaledShardId(IndexMetadata indexMetadata, String effectiveRouting, int partitionOffset) 
        final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;

        // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
        // of original index to hash documents
        return Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor();
            

到这里可以知道ES就是通过Document _id hash找到对应的shard。

下面看一下是如何做到实时可见的?

数据节点接收协调节点请求的入口为:TransportSingleShardAction.ShardTransportHandler# messageReceived:

        @Override
        public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception 
            if (logger.isTraceEnabled()) 
                logger.trace("executing [] on shard []", request, request.internalShardId);
            
            asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request));
        

具体执行是在子类TransportGetAction#asyncShardOperation中:

    @Override
    protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException 
        IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
        IndexShard indexShard = indexService.getShard(shardId.id());
        // 关于realtime可以看一下官方文档
        // https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html
        if (request.realtime())  // we are not tied to a refresh cycle here anyway
            super.asyncShardOperation(request, shardId, listener);
         else 
            indexShard.awaitShardSearchActive(b -> 
                try 
                    super.asyncShardOperation(request, shardId, listener);
                 catch (Exception ex) 
                    listener.onFailure(ex);
                
            );
        
    

TransportGetAction#asyncShardOperation获取文档最终调用的是:

    @Override
    protected GetResponse shardOperation(GetRequest request, ShardId shardId) 
        IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
        IndexShard indexShard = indexService.getShard(shardId.id());

        // 关于realtime、refresh可以看一下官方文档
        // https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html
        if (request.refresh() && !request.realtime()) 
            indexShard.refresh("refresh_flag_get");
        

        GetResult result = indexShard.getService().get(request.id(), request.storedFields(),
                request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
        return new GetResponse(result);
    

shardOperation先检查是否需要refresh,然后调用indexShard.getService().get()读取数据并存储到GetResult中。读取及过滤 在ShardGetService#get()函数中,调用:
GetResult getResult = innerGet(…);
获取结果。GetResult类用于存储读取的真实数据内容。核心的数据读取实现在ShardGetService#innerGet(…)函数中:

private GetResult innerGet(String id, String[] gFields, boolean realtime, long version, VersionType versionType,
                               long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) 
        fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);

        // 调用Engine获取数据
        Engine.GetResult get = indexShard.get(new Engine.Get(realtime, realtime, id)
            .version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
        assert get.isFromTranslog() == false || realtime : "should only read from translog if realtime enabled";
        if (get.exists() == false) 
            get.close();
        

        if (get == null || get.exists() == false) 
            return new GetResult(shardId.getIndexName(), id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);
        

        try 
            // 获取返回结果
            // break between having loaded it from translog (so we only have _source), and having a document to load
            return innerGetLoadFromStoredFields(id, gFields, fetchSourceContext, get, mapperService);
         finally 
            get.close();
        
    


    //对指定的field、source进行过滤(source过滤只支持对字段),
    //把结果存于GetResult对象中
    private GetResult innerGetLoadFromStoredFields(String id, String[] storedFields, FetchSourceContext fetchSourceContext,
                                                   Engine.GetResult get, MapperService mapperService) 
        assert get.exists() : "method should only be called if document could be retrieved";

        // check first if stored fields to be loaded don't contain an object field
        DocumentMapper docMapper = mapperService.documentMapper();
        if (storedFields != null) 
            for (String field : storedFields) 
                Mapper fieldMapper = docMapper.mappers().getMapper(field);
                if (fieldMapper == null) 
                    if (docMapper.mappers().objectMappers().get(field) != null) 
                        // Only fail if we know it is a object field, missing paths / fields shouldn't fail.
                        throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");
                    
                
            
        

        Map<String, DocumentField> documentFields = null;
        Map<String, DocumentField> metadataFields = null;
        BytesReference source = null;
        DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
        // force fetching source if we read from translog and need to recreate stored fields
        boolean forceSourceForComputingTranslogStoredFields = get.isFromTranslog() && storedFields != null &&
                Stream.of(storedFields).anyMatch(f -> TranslogLeafReader.ALL_FIELD_NAMES.contains(f) == false);
        FieldsVisitor fieldVisitor = buildFieldsVisitors(storedFields,
            forceSourceForComputingTranslogStoredFields ? FetchSourceContext.FETCH_SOURCE : fetchSourceContext);
        if (fieldVisitor != null) 
            try 
                docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor);
             catch (IOException e) 
                throw new ElasticsearchException("Failed to get id [" + id + "]", e);
            
            source = fieldVisitor.source();

            // in case we read from translog, some extra steps are needed to make _source consistent and to load stored fields
            if (get.isFromTranslog()) 
                // Fast path: if only asked for the source or stored fields that have been already provided by TranslogLeafReader,
                // just make source consistent by reapplying source filters from mapping (possibly also nulling the source)
                if (forceSourceForComputingTranslogStoredFields == false) 
                    try 
                        source = indexShard.mapperService().documentMapper().sourceMapper().applyFilters(source, null);
                     catch (IOException e) 
                        throw new ElasticsearchException("Failed to reapply filters for [" + id + "] after reading from translog", e);
                    
                 else 
                    // Slow path: recreate stored fields from original source
                    assert source != null : "original source in translog must exist";
                    SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), id, source, XContentHelper.xContentType(source),
                        fieldVisitor.routing());
                    ParsedDocument doc = indexShard.mapperService().documentMapper().parse(sourceToParse);
                    assert doc.dynamicMappingsUpdate() == null : "mapping updates should not be required on already-indexed doc";
                    // update special fields
                    doc.updateSeqID(docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);
                    doc.version().setLongValue(docIdAndVersion.version);

                    // retrieve stored fields from parsed doc
                    fieldVisitor = buildFieldsVisitors(storedFields, fetchSourceContext);
                    for (IndexableField indexableField : doc.rootDoc().getFields()) 
                        IndexableFieldType fieldType = indexableField.fieldType();
                        if (fieldType.stored()) 
                            FieldInfo fieldInfo = new FieldInfo(indexableField.name(), 0, false, false, false, IndexOptions.NONE,
                                DocValuesType.NONE, -1, Collections.emptyMap(), 0, 0, 0, false);
                            StoredFieldVisitor.Status status = fieldVisitor.needsField(fieldInfo);
                            if (status == StoredFieldVisitor.Status.YES) 
                                if (indexableField.numericValue() != null) 
                                    fieldVisitor.objectField(fieldInfo, indexableField.numericValue());
                                 else if (indexableField.binaryValue() != null) 
                                    fieldVisitor.binaryField(fieldInfo, indexableField.binaryValue());
                                 else if (indexableField.stringValue() != null) 
                                    fieldVisitor.objectField(fieldInfo, indexableField.stringValue());
                                
                             else if (status == StoredFieldVisitor.Status.STOP) 
                                break;
                            
                        
                    
                    // retrieve source (with possible transformations, e.g. source filters
                    source = fieldVisitor.source();
                
            

            // put stored fields into result objects
            if (!fieldVisitor.fields().isEmpty()) 
                fieldVisitor.postProcess(mapperService::fieldType);
                documentFields = new HashMap<>();
                metadataFields = new HashMap<>();
                for (Map.Entry<String, List<Object>> entry : fieldVisitor.fields().entrySet()) 
                    if (mapperService.isMetadataField(entry.getKey())) 
                        metadataFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));
                     else 
                        documentFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));
                    
                
            
        

        if (source != null) 
            // apply request-level source filtering
            if (fetchSourceContext.fetchSource() == false) 
                source = null;
             else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) 
                Map<String, Object> sourceAsMap;
                // TODO: The source might be parsed and available in the sourceLookup but that one uses unordered maps so different.
                //  Do we care?
                Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);
                XContentType sourceContentType = typeMapTuple.v1();
                sourceAsMap = typeMapTuple.v2();
                sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
                try 
                    source = BytesReference.bytes(XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap));
                 catch (IOException e) 
                    throw new ElasticsearchException("Failed to get id [" + id + "] with includes/excludes set", e);
                
            
        

        return new GetResult(shardId.getIndexName(), id, get.docIdAndVersion().seqNo, get.docIdAndVersion().primaryTerm,
            get.version(), get.exists(), source, documentFields, metadataFields);
    

下面看一下InternalEngine的读取过程:

InternalEngine#get过程会加读锁。处理realtime选项,如果为true,则先判断是否有数据可以刷盘,然后调用Searcher进行读取。Searcher是对IndexSearcher的封装。

从ES 5.x开始不会从translog中读取,只从Lucene中读。realtime的实现机制变成依靠refresh实现。参考官方链接:https://github.com/elastic/elasticsearch/pull/20102

 @Override
    public GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper) 
        assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
        try (ReleasableLock ignored = readLock.acquire()) 
            ensureOpen();
            // 处理realtime选项,判断是否需要刷盘
            if (get.realtime()) 
                final VersionValue versionValue;
                // versionMap中的值是写入索引的时候添加的,不会写到磁盘中
                try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) 
                    // we need to lock here to access the version map to do this truly in RT
                    versionValue = getVersionFromMap(get.uid().bytes());
                
                if (versionValue != null) 
                    if (versionValue.isDelete()) 
                        return GetResult.NOT_EXISTS;
                    
                    if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) 
                        throw new VersionConflictEngineException(shardId, get.id(),
                            get.versionType().explainConflictForReads(versionValue.version, get.version()));
                    
                    if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
                        get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term
                        )) 
                        throw new VersionConflictEngineException(shardId, get.id(),
                            get.getIfSeqNo(), get.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term);
                    
                    if (get.isReadFromTranslog()) 
                        // this is only used for updates - API _GET calls will always read form a reader for consistency
                        // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
                        if (versionValue.getLocation() != null) 
                            try 
                                final Translog.Operation operation = translog.readOperation(versionValue.getLocation());
                                if (operation != null) 
                                    return getFromTranslog(get, (Translog.Index) operation, mapper, searcherWrapper);
                                
                             catch (IOException e) 
                                maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
                                throw new EngineException(shardId, "failed to read operation from translog", e);
                            
                         else 
                            trackTranslogLocation.set(true);
                        
                    
                    assert versionValue.seqNo >= 0 : versionValue;
                    //执行刷盘操作
                    refreshIfNeeded("realtime_get", versionValue.seqNo);
                
                // 调用Searcher读取数据
                return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper));
             else 
                // we expose what has been externally expose in a point in time snapshot via an explicit refresh
                return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper));
            
        
    

小结

  • GET是根据Document _id 哈希找到对应的shard的。
  • 根据Document _id查询的实时可见是通过依靠refresh实现的。

参考资料:
《Elasticsearch源码解析与优化实战》

以上是关于Elasticsearch 5.x 源码分析(14)你一定需要使用nested 类型吗?的主要内容,如果未能解决你的问题,请参考以下文章

《Elasticsearch 源码解析与优化实战》第10章:索引恢复流程分析

《Elasticsearch 源码解析与优化实战》第14章:Cluster模块分析

《Elasticsearch 源码解析与优化实战》第14章:Cluster模块分析

《Elasticsearch 源码解析与优化实战》第3章:集群启动流程

《Elasticsearch 源码解析与优化实战》第3章:集群启动流程

CentOS7 下安装 ElasticSearch 5.x 及填坑