ES indexSort 原理源码解析

Posted 水的精神

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ES indexSort 原理源码解析相关的知识,希望对你有一定的参考价值。

在上一篇文章中,是我对es indexSort的优化对检索性能提升的测试。测试结论是:好的情况下,会有50%的性能提升效果。这让我对它是如何做到的,产生了浓烈的兴趣。

在这篇文章中,结合源码对原理进行一个解析

这里我想先提出两个问题

  1. 如何做到indexSort的,对已经写入的数据,再加入新的数据的时候,是不是需要对原来的数据进行重新排序?

  1. 更新逻辑是什么样的?在使用indexSort后,假如数据要更新,如何保证数据有序?

  1. 为什么会对检索性能提升这么多?

在下文中,会给出答案。

IndexSort使用场景有限

需要需要满足两个条件

  • 需要查询的Sort顺序与IndexSorting的顺序相同

  • 并且不需要获取符合条件的记录总数(TotalHits), "track_total_hits": false

除此之外,还有一些场景不能生效

对于字符串进行Range查询,且Range范围内有很多符合条件的Term的场景。这个场景下,查询可能会慢在两个地方,一个是Range范围内符合条件的Term非常多,扫描FST耗时很大,另一个如果这些Term对应的doc数很多,要构造BitSet也会非常耗时。因为利用IndexSorting的提前中断是发生在BitSet构造好之后,所以并不能优化到这个地方的性能。

对数字类型在BKD-Tree上进行范围查找时,因为BKD-Tree里的docID不是顺序排列的,所以并不像倒排链一样可以顺序读取。如果BKD-Tree上符合条件的docID很多,构造BitSet也很耗时,也不是IndexSorting能够优化到的。

检索需要依赖相关性的时候 ,因为根据相关性排序,一定要扫描一下全表才知道一个全局的分数。不可能通过局部的分数精准的取出TopK。

也就是说,假如在一次查询中,如果时间花费在 构建bitSet上。那么就算用了indexSet也不会有太好的效果。索引可以使用 profile分析一下检索语句,检索过程。

indexSort对检索性能提升的本质

实际上就是对数据做预处理 + 提前终止查询过程

与查询时的Sort不同,IndexSorting是一种预排序,即数据预先按照某种方式进行排序,它是Index的一个设置,不可更改。大家知道,Elasticsearch的底层是Lucene,Lucene中是以Segment为单位进行查询的,这里说的IndexSorting对数据进行预排序也是在每个Segment内有序的。

一个Segment中的每个文档,都会被分配一个docID,(注意这里docID 是内存储用的,不是文档里边的_id),docID从0开始,顺序分配。在没有IndexSorting时,docID是按照文档写入的顺序进行分配的,在设置了IndexSorting之后,docID的顺序就与IndexSorting的顺序一致。

举个例子来说,假如文档中有一列为Timestamp,我们在IndexSorting中设置按照Timestamp逆序排序,那么在一个Segment内,docID越小,对应的文档的Timestamp越大,即按照Timestamp从大到小的顺序分配docID。

Lucene中的倒排链都是按照docID从小到大的顺序排列的,在进行组合条件查询时,也是按照docID从小到大的顺序选出符合条件的doc。那么当查询时的Sort顺序与IndexSorting的顺序相同时,会发生什么呢?

比如查询时希望按照Timestamp降序排序后返回100条结果,在Lucene中进行查询时,发现docID对应的doc顺序也刚好是Timestamp降序排序的,那么查询到前100个符合条件的结果即可返回,这100个一定也是Timestamp最大的100个,这就做到了提前中断。

提前中断可以极大的提升查询性能,特别是当一个查询条件命中的文档数量非常多的时候。在没有IndexSorting时,必须把所有符合条件的文档的docID扫描一遍,并且读取这些doc的一些字段来排序,选出符合条件的doc。有了IndexSorting之后,只需要选出前Top个doc即可,避免了全部扫描,性能甚至可以提升几个数量级。

indexSort的发展历史

在Elasticsearch中,IndexSorting是6.0版本才引入的。ES其实本身是一个分布式壳子,实际上基本上都是在Lucene上做的。所以也要看看在lucene中的发展历史。

在Lucene中,IndexSorting其实已经发展了一段时间。最早在10年,Lucene提供了一个IndexSorter的工具,作为一个离线工具可以对Index数据排序后生成一个新的Index。后来13年加入了SortingMergePolicy,在Segment进行merge的时候可以生成排好序的新Segment,在17年又加入了Sorting on flushed segment的功能,在Segment最初生成时就进行排序。另一方面是Lucene在查询时也做了很多优化,如果有IndexSorting,很多地方做了提前中断,后面会讲提前中断对查询性能的巨大作用。经过几次Lucene的改进和优化,IndexSorting这个功能也终于被集成进Elasticsearch。

indexSort实现原理

如何保证数据根据某个字段排序?

想要弄清楚这个问题,需要大概知道es的底层存储原理。

  1. es的一个分片是一个lucene实例。一次检索发生的最小单元是segment。segment也是lucene的概念。 es每次refresh,会产生一个新的segment。

  1. es 是日志合并树的概念,也就是写入的数据,实际上是不再发生改变。上边说的refresh,实际上就是把堆中index buffer 中的数据生成一个segment,放在os 文件存储系统上,此时还没有落磁盘。但是段一旦产生,就已经不会再改变了。想要实现indexSort 在这在segment生成前,在堆内存中,要进行排序。

  1. 定期对小段进行合并,将符合条件的小段合并成大段。所以想要实现indexSort,会有第二个问题,在segment合并的过程中,再做一次排序。这里就相当于是把两个有序的链表,合并成一个有序的链表。

每个doc写入进来之后,按照写入顺序被分配一个docID,然后被IndexingChain处理,依次要对invert index、store fields、doc values和point values进行处理,有些数据会直接写到文件里,主要是store field和term vector,其他的数据会放到memory buffer中。

首先根据设定的列排序,这个排序可以利用内存中的doc values,排序之后得到老的docID到新docID的映射,因为之前docID是按照写入顺序生成的,现在重排后,生成的是新的排列。如果排序后与原来顺序完全一致,那么什么都不做,跟之前流程一样。

如果排序后顺序发生变化,如何排序呢?对于已经写到文件中的数据,比如store field和term vector,需要从文件中读出来,重新排列后再写到一个新文件里,原来的文件就相当于一个临时文件。对于内存中的数据结构,直接在内存中重排后写到文件中。

相比没有IndexSorting时,对性能影响比较大的一块就是store field的重排,因为这部分需要从文件中读出再写回,而其他部分都是内存操作,性能影响稍小一些。

不将store field和term vector这类数据也buffer在内存中,可以减少index buffer堆的压力
那假如把store field和term vector这类数据也buffer在内存中,是否可以提升IndexSorting开启时的写入性能?

indexSort实现细节

Elasticsearch中实现索引排序(Index Sorting)的方式是在文档被索引时对文档进行排序,并将排序后的信息存储在一个隐藏的字段中。当进行搜索时,Elasticsearch会利用该字段来执行排序操作,而不需要对原始数据进行排序。

具体实现方式是在倒排索引(Inverted Index)中维护一个有序列表(Sorted List),用于存储每个词项(Term)对应的文档ID以及在该词项下的排序值。当有新文档被索引时,会将该文档的ID和排序值插入到对应的有序列表中,并按照排序值进行排序。这样,当进行搜索时,可以根据查询语句中指定的排序规则快速访问对应的有序列表,并将文档ID按照顺序返回,实现了快速的排序操作。

Elasticsearch中的倒排索引实现是基于Lucene的实现。Lucene中的倒排索引使用了类似于跳表(Skip List)的数据结构,其中每个词项对应一个有序链表,链表中的每个节点存储了文档ID和在该词项下的排序值。为了提高搜索性能,Lucene还使用了一些优化技巧,例如使用倒排表(Inverted Table)存储有序链表的起始位置以及跨度(Span),以便快速定位和跳过无关的文档。

需要注意的是,索引排序会占用额外的存储空间,并在索引文档时会增加一定的性能开销。因此,在选择使用索引排序时,需要根据具体情况权衡性能和存储需求。

indexSort 相关源码

在Elasticsearch中实现Index Sorting的主要代码位于lucene-core模块中的Sorter.java和SorterTemplate.java文件中,这些代码是在Lucene中实现排序的基础上,针对Elasticsearch的需求进行了一些修改和优化。

Index Sorting的实现主要分为两个阶段:索引时的排序和搜索时的排序。

  1. 索引时的排序

在Elasticsearch中,索引时的排序是通过修改lucene-core模块中的IndexWriter.java文件实现的。具体而言,IndexWriter在向索引中添加新文档时,会根据排序规则计算出每个文档的排序值,并将排序值存储在一个名为"_doc"的隐藏字段中。同时,IndexWriter还会在每个字段的倒排索引中添加一个TermOrdValComparator对象,用于在添加新文档时对文档进行排序。

TermOrdValComparator实现了TermOrdValComparatorSource接口,其中TermOrdValComparatorSource定义了用于计算文档排序值的getComparator方法。在Elasticsearch中,TermOrdValComparatorSource的实现类是ElasticsearchFieldComparatorSource,该类会根据排序规则创建不同的FieldComparator对象,并在FieldComparator中实现计算文档排序值的方法。

  1. 搜索时的排序

在搜索时,Elasticsearch会根据查询语句中的排序规则从lucene-core模块中获取排序器(Sorter)对象,然后利用该对象对搜索结果进行排序。具体而言,Sorter会根据查询语句中指定的排序规则获取排序字段的倒排索引,并从中获取每个文档的排序值,然后根据排序规则对文档进行排序,并返回排序后的文档列表。

总的来说,Elasticsearch的Index Sorting实现是在lucene-core模块的基础上进行的,主要是通过修改IndexWriter.java文件和创建ElasticsearchFieldComparatorSource对象来实现索引时的排序,以及通过Sorter.java文件和ElasticsearchFieldComparatorSource对象来实现搜索时的排序。

具体来说,Elasticsearch的Index Sorting的实现可以分为以下几个步骤:

  1. 在创建索引时,指定需要排序的字段以及排序规则,例如:

PUT my_index

  "mappings": 
    "properties": 
      "my_field": 
        "type": "text",
        "fields": 
          "keyword": 
            "type": "keyword",
            "ignore_above": 256
          
        
      
    
  ,
  "settings": 
    "sort": 
      "my_field.keyword": "order": "asc"
    
  

在这个例子中,我们指定了对名为"my_field.keyword"的字段按照升序进行排序。

  1. 在文档被索引时,Elasticsearch会为每个文档计算出其排序值,并将排序值存储在一个名为"_doc"的隐藏字段中。

  1. 在搜索时,根据查询语句中的排序规则创建排序器(Sorter)对象,并利用该对象对搜索结果进行排序。排序器会从倒排索引中获取每个文档的排序值,并根据排序规则对文档进行排序,最终返回排序后的文档列表。

Elasticsearch的Index Sorting实现并不复杂,但是它可以大幅提升搜索性能,特别是在需要对大量文档进行排序时。由于Index Sorting会在索引时对文档进行排序,因此可以避免在搜索时对所有文档进行排序的开销,从而大幅提升搜索性能。所以这项提升的本质是预处理技术,将需要实时计算的部分,在数据写入的时候,做一个预处理。这是其实洗升了写入的速度。牺牲了将近一倍。

在Elasticsearch的源码中,Index Sorting的实现是基于Lucene的实现进行的,具体涉及到以下几个类:

  1. ElasticsearchFieldComparatorSource

ElasticsearchFieldComparatorSource是Elasticsearch对Lucene的FieldComparatorSource接口的一个实现,用于计算文档排序值。在实现中,ElasticsearchFieldComparatorSource会根据排序规则创建不同的FieldComparator对象,其中FieldComparator实现了compare方法,用于比较文档排序值的大小。在compare方法中,ElasticsearchFieldComparatorSource会利用SortField对象中的属性信息,以及Document对象中的排序值信息来计算文档的排序值。

  1. IndexSortingConsumer

IndexSortingConsumer是Elasticsearch的一个内部类,用于在索引文档时进行排序。在实现中,IndexSortingConsumer会利用ElasticsearchFieldComparatorSource计算每个文档的排序值,并将排序值存储在一个名为"_doc"的隐藏字段中。同时,IndexSortingConsumer还会在每个字段的倒排索引中添加一个TermOrdValComparator对象,用于在添加新文档时对文档进行排序。

  1. Sorter

Sorter是Elasticsearch用于对搜索结果进行排序的核心类。在实现中,Sorter会根据查询语句中指定的排序规则获取排序字段的倒排索引,并从中获取每个文档的排序值,然后根据排序规则对文档进行排序,并返回排序后的文档列表。

  1. SortBuilder

SortBuilder是Elasticsearch用于创建排序规则的类。在实现中,SortBuilder会根据用户指定的排序规则创建SortField对象,并将SortField对象传递给Sorter进行排序。

以上这些类都是基于Lucene的实现进行的,因此其实现原理与Lucene类似。在具体实现过程中,Elasticsearch还做了一些优化,例如在排序时利用FieldDataCache缓存排序字段的倒排索引,以及在排序时利用位集操作来优化文档排序的计算过程。

参考文章:剖析Elasticsearch的IndexSorting:一种查询性能优化利器

ES源码分析Transport模块之REST的解析与处理

文章目录

Transport模块之REST的解析与处理

基于ES源码6.7.2

注册REST处理

Node ActionModule NetworkModule Netty4Plugin new ActionModule setupActions register new RestController ActionModule new NetworkModule plugin.getHttpTransports registerHttpTransport plugin.getTransports registerTransport plugin.getTransportInterceptors registerTransportInterceptor NetworkModule actionModule.initRestHandlers Node ActionModule NetworkModule Netty4Plugin

借由上图回顾一下通信模块的初始化过程,在ActionModule下对Rest请求处理进行了注册,注册过程在initRestHandlers方法中。可以发现对REST请求执行处理的类的命名是Rest*Action,同时可以发现这些处理类都继承了BaseRestHandler,而BaseRestHandler继承了RestHandler

    public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) 
        List<AbstractCatAction> catActions = new ArrayList<>();
        Consumer<RestHandler> registerHandler = a -> 
            if (a instanceof AbstractCatAction) 
                catActions.add((AbstractCatAction) a);
            
        ;
        registerHandler.accept(new RestMainAction(settings, restController));
        registerHandler.accept(new RestNodesInfoAction(settings, restController, settingsFilter));
        registerHandler.accept(new RestRemoteClusterInfoAction(settings, restController));
        registerHandler.accept(new RestNodesStatsAction(settings, restController));
        registerHandler.accept(new RestNodesUsageAction(settings, restController));
        registerHandler.accept(new RestNodesHotThreadsAction(settings, restController));
        registerHandler.accept(new RestClusterAllocationExplainAction(settings, restController));
        registerHandler.accept(new RestClusterStatsAction(settings, restController));
        registerHandler.accept(new RestClusterStateAction(settings, restController, settingsFilter));
        registerHandler.accept(new RestClusterHealthAction(settings, restController));
        ...
        ...
        ...
        for (ActionPlugin plugin : actionPlugins) 
            for (RestHandler handler : plugin.getRestHandlers(settings, restController, clusterSettings, indexScopedSettings,
                    settingsFilter, indexNameExpressionResolver, nodesInCluster)) 
                registerHandler.accept(handler);
            
        
        registerHandler.accept(new RestCatAction(settings, restController, catActions));
    

RestClusterHealthAction为例,在其构造函数中对请求头中请求方法为GET,URI为/_cluster/health和拥有占位符/_cluster/health/index的处理类为this(即自己)。

    public RestClusterHealthAction(Settings settings, RestController controller) 
        super(settings);

        controller.registerHandler(RestRequest.Method.GET, "/_cluster/health", this);
        controller.registerHandler(RestRequest.Method.GET, "/_cluster/health/index", this);
    

由于继承了BaseRestHandler,所以必须实现prepareRequest方法,用于在接收到请求时,做一些前置工作,比如验证参数,转换为内部RPC请求等。

处理请求

HTTP请求执行路径

Netty4HttpRequestHandler Netty4HttpServerTransport RestController BaseRestHandler Rest*Action 内部Action channelRead0 dispatchRequest dispatchRequest tryAllHandlers dispatchRequest handleRequest prepareRequest RestChannelConsumer action.accept(channel) execute(...) sendResponse Netty4HttpRequestHandler Netty4HttpServerTransport RestController BaseRestHandler Rest*Action 内部Action

BaseRestHandlerhandleRequest方法

    @Override
    public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception 
        // 对请求的预处理
        final RestChannelConsumer action = prepareRequest(request, client);

        // validate unconsumed params, but we must exclude params used to format the response
        // use a sorted set so the unconsumed parameters appear in a reliable sorted order
        final SortedSet<String> unconsumedParams =
            request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));

        // validate the non-response params
        if (!unconsumedParams.isEmpty()) 
            final Set<String> candidateParams = new HashSet<>();
            candidateParams.addAll(request.consumedParams());
            candidateParams.addAll(responseParams());
            throw new IllegalArgumentException(unrecognized(request, unconsumedParamsES源码分析Transport模块之REST的解析与处理

Android OpenGL ES 学习 – GLSurfaceView 源码解析GL线程以及自定义 EGL

Android OpenGL ES 学习 – GLSurfaceView 源码解析GL线程以及自定义 EGL

Pandas:sort_indexsort_values方法的使用

Spring MVC工作原理及源码解析 ViewResolver实现原理及源码解析

Spring MVC工作原理及源码解析DispatcherServlet实现原理及源码解析