Elasticsearch源码分析-索引分析(一)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch源码分析-索引分析(一)相关的知识,希望对你有一定的参考价值。

参考技术A 首先,我们来看一个索引请求:

这个请求的主要作用是向item索引中添加一个索引文档,文档信息:
文档 id: 28589790
字段id: 28589790
字段text: 这是一个索引文本
如果索引中已经包含id为28589790的索引,elasticsearch将会使用这条数据进行覆盖

elasticsearch使用HttpRequestHandler.messageReceived()方法接受用户请求,然后调用dispatchRequest()方法对请求进行转发。
当请求跳转到RestController时,会调用getHandler()方法根据请求的Path获取对应的handler,由上文可以看出 item/show/28589790 会匹配到RestIndexAction

handler.handleRequest()方法最终会调用RestIndexAction.handleRequest()方法对索引参数进行解析,创建索引请求对象indexRequest,然后调用client.index()开始创建索引

在索引请求中,支持下列参数:
routing: 路由信息,具有相同路由信息的文档存储在同一分片上
parent: 文档的parent id, 如果未设置路由,则会自动将其设置为路由
timestamp: 文档产生的时间戳
ttl: 过期时间
timeout: 超时时间
refresh: 此索引操作之后是否执行刷新,从而使文档可被搜索,默认为false
version: 文档的版本号
version_type: 版本类型,默认internal,支持internal、external、external_gt、external_gte和force
op_type: 索引操作类型,支持create和index
replication: 副本类型,支持async、sync和default
consistency: 一致性,支持one、quorum、all和default
请求的content即索引的source,文档内容
在封装完索引请求后,就要调用 client.index() 执行索引

在index()方法中,使用的Action是IndexAction.INSTANCE

这个action在ActionModule中被TransportIndexAction注册

因此在NodeClient的execute()方法中根据action获取到的transport action为TransportIndexAction

由于TransportIndexAction继承了TransportAction,因此调用过程为NodeClient.execute() -> TransportAction.execute() -> TransportIndexAction.doExecute()
索引的大体流程为:先判断是否需要创建索引,如果是则先创建索引,然后写入文档数据,否则直接写入文档数据

elasticsearch主要使用AutoCreateIndex.shouldAutoCreate()方法来判断是否需要创建索引

其中参数和globallyDisabled的含义:
action.auto_create_index: elasticsearch配置文件的的配置项,表示是否允许创建索引
needToCheck: 是否需要检查能否创建索引,只有当action.auto_create_index为false时不需要检查,直接返回无法创建索引
globallyDisabled: 是否全局禁用创建索引,只有当action.auto_create_index为false时全局禁用创建索引,直接返回无法创建索引
如果当前集群中已经包含了要创建的索引,那么也不需要创建索引。其他情况则根据action.auto_create_index配置的正则表达式来判断
如果允许创建索引,则开始创建索引名的流程

首先创建 创建索引 的请求createIndexRequest,设置了4个参数,分别是索引名index、索引mapping、创建索引的原因cause和master节点超时时间masterNodeTimeout

然后开始调用createIndexAction.execute()方法创建索引名

从下面的类图可以看出,TransportCreateIndexAction继承了TransportMasterNodeOperation,调用过程即TransportAction.execute()-> TransportMasterNodeOperation.doExecute()方法来完成操作

在TransportMasterNodeOperation中主要是保证操作在master节点上执行

这个操作主要保证了两点:
(1)如果当前节点不是master,则将请求发送到master节点执行masterOperation()方法
(2)如果当前集群block了,则等待集群状态更新,然后重新执行完整的innerExecute()方法

然后进入到TransportCreateIndexAction.masterOperation()方法中,创建CreateIndexClusterStateUpdateRequest对象,用来创建索引时更新集群状态信息的请求,其中settings和mappings及aliases默认为空集合

然后调用MetaDataCreateIndexService的createIndex()方法,如果能获取到锁信息则直接执行重载的createIndex()方法,否则交给线程池去执行

在重载从createIndex()方法中,通过提交一个更新集群状态的任务来实现创建索引的具体逻辑

提交StateUpdateTask任务时,会创建一个UpdateTask对象,然后执行其run()方法,即MetaDataCreateIndexService中创建的AckedClusterStateUpdateTask匿名对象

在UpdateTask的run()方法中,会调用ClusterStateUpdateTask.execute()方法获取新的集群状态,

在完成索引创建完成后,集群状态信息会发生变化,elasticsearch会将这个变化发布到其他节点,以维持集群统一的状态信息

《Elasticsearch 源码解析与优化实战》第17章:Shrink原理分析

一、简介

官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/master/indices-shrink-index.html

索引分片数量一般在模板中统一定义,在数据规模比较大的索引中,索引分片数一般也大一些,在笔者的集群中设置为24。同时按天生成新的索引,使用别名关联。但是,并非每天的索引数据量都很大,小数据量的索引同样有较大的分片数。在ES中,主节点管理分片是很大的工作量,降低集群整体分片数量可以减少recovery时间,减小集群状态的大小。因此,可以使用Shrink API缩小索引分片数。当索引缩小完成后,源索引可以删除。

**Shrink API****ES 5.0****之后提供的新功能,其可以缩小主分片数量。但其并不对源索引直接进行缩小操作,而是使用与源索引相同的配置创建一个新索引,仅降低分片数。由于添加新文档时使用对分片数量取余获取目的分片的关系,新索引的主分片数必须是源索引主分片数的因数。**例如,8个分片可以缩小到4、2、1个分片。如果源索引的分片数为素数,则目标索引的分片数只能为1。

下面举一个例子来分析缩小过程。

1.1、准备源索引

创建索引: my_source_index, 包括5个主分片和1个副分片,并写入几条测试数据通过下面的命令,将索引标记为只读,且所有分片副本都迁移到名为 node-idea 的节点上。

注意,“所有分片副本”不指索引的全部分片,无论主分片还是副分片,任意一个就可以。分配器也不允许将主副分片分配到同-节点。

PUT my_source_index
{
    "settings": {
        "index.number_of_replicas": 0, # 移除源索引的副本
        "index.routing.allocation.require._name": "node-idea", # 将源索引的分片迁移到同一个节点上
        "index.blocks.write": true  # 设置索引为只读模式
    }
}

选项index.blocks.write设置为 true 来禁止对索引的写操作。但索引的 metadata 可以正常写。

It can take a while to relocate the source index. Progress can be tracked with the_cat recoveryAPI, or thecluster healthAPIcan be used to wait until all shards have relocated with thewait_for_no_relocating_shardsparameter.

1.2、缩小索引

待分片迁移完毕,我们就可以执行执行Shrink操作了:

以上代码将创建含有一个主分片和一个副分片的目的索引my_target_index

缩小索引需要满足下列要求:

  • 目标索引存在
  • 索引状态必须是Green
  • 原索引主分片数量比目标索引多,且原索引主分片数量是目标索引倍数
  • 索引中的所有文档在目标索引将会被缩小到一个分片的数量不会超过 2,147,483,519 ,因为这是一个分片的承受的最大文档数量。
  • 执行缩小进程的节点必须要有足够的空闲磁盘空间满足原索引的分片能够全部复制迁徙到该节点。

三、Shrink的工作原理

引用官方手册对Shrink工作过程的描述:

  • 以相同配置创建目标索引,但是降低主分片数量
  • 从源索引的Lucene分段创建硬链接到目的索引。如果系统不支持硬链接,那么索引的所有分段都将复制到新索引,将会花费大量时间
  • 对目标索引执行恢复操作,就像一个关闭的索引重新打开时一样

3.1、创建新索引

使用旧索引的配置创建新索引,只是减少主分片的数量,所有副本都迁移到同一个节点。显然,创建硬链接时,源文件和目标文件必须在同一台主机。

3.2、创建硬链接

从源索引到目的索引创建硬链接。如果操作系统不支持硬链接,则复制Lucene分段。

在Linux下通过strace命令跟踪硬链接创建过程:

strace -e trace=file -p {pid}

Linux下的strace命令用于跟踪系统调用,trace=file表示只跟踪与文件操作相关的系统调用,关于该命令的完整使用方式请求可参考man手册。在strace命令的输出结果中,我们能清晰看到内部过程:

  • JYglvWRnSqmNgA3E1CahZw为源索引;
  • RvDP65d-QD-QTpwOCaLWOg为目的索引;
  • 0.cfe、0.si、_0.cfs 类型的文件为Lucene编号为0的segment,编号依次类推;

链接过程:从源索引的shard[0]开始,遍历所有shard,将所有segment链接到目的索引,目的索引的segment从0开始命名,依次递增。在本例中,由于源索引的shard[0]没有数据,因此从shard[ 1]开始链接。

3.2.1、为什么一定要硬链接,不使用软链接?

Linux的文件系统由两部分组成(实际上任何文件系统的基本概念都相似):inode和block。block用于存储用户数据,inode 用于记录元数据,系统通过inode 定位唯一的文件。

  • 硬链接:文件有相同的inode和block。
  • 软链接:文件有独立的inode和block,block 内容为目的文件路径名。

那么为什么一定要硬链接过去呢?从本质上来说,我们需要保证Shrink之后,源索引和目的索引是完全独立的,读写和删除都不应该互相影响。**如果软链接过去,删除源索引,则目的索引的数据也会被删除,硬链接则不会。**满足下面条件时操作系统才真正删除文件:

文件被打开的fd数量为0且硬链接数量为0。

使用硬链接,删除源索引,只是将文件的硬链接数量减1,删除源索引和目的索引中的任何一个,都不影响另一个正常读写。

由于使用了硬链接,也因为硬链接的特性带来一些限制:不能交叉文件系统或分区进行硬链接的创建,因为不同分区和文件系统有自己的inode。

不过,既然都是链接,Shrink 完成后,修改源索引,目的索引会变吗?答案是不会。虽然链接到了源分段,Shrink期间索引只读,目标索引能看到的只有源索引的当前数据,Shrink 完成后,由于Lucene中分段的不变性,“write once”机制保证每个文件都不会被更新。源索引新写入的数据随着refresh会生成新分段,而新分段没有链接,在目标索引中是看不到的。如果源索引进行merge,对源分段执行删除时,只是硬链接数量减1,目标索引仍然不受影响。因此,Shrink完毕后最终的效果就是,两个索引的数据看起来是完全独立的。

经过链接过程之后,主分片已经就绪,副分片还是空的,通过recovery 将主分片数据复制到副分片。下面看一下相关实现代码

3.2、硬链接过程源码分析

硬链接过程在目标索引my_target_index 的恢复流程中,入口为IndexShard#startRecovery,有下列几种类型的recovery:

  • EXISTING_STORE,主分片从translog恢复;
  • PEER,副分片从主分片远程拉取;
  • SNAPSHOT,从快照中恢复;
  • LOCAL_SHARDS,从同一个节点的其他分片恢复Shrink使用这种恢复类型;

shrink index 时的恢复类型为LOCAL_SHARDS,执行storeRecovery.recoverFromLocalShards

addIndices中,调用Lucene中的org.apache.lucene.store.HardlinkCopyDirectoryWrapper实现硬链接。

addIndices将整个源索引的全部shard链接到目标路径:

addIndices (RecoveryState.Index indexRecoveryStats, Directory target, Directory. .. sources)

本例中源索引有5个分片,sources 值如下:

0 = "(store (mmapfs (/V/ idea/ nodes/0/ indices/-Puacb8gSQG4UAvr -vNopQ/0/index)))"
1 = " (store (mmapfs (/V/idea/ nodes/0/ indices/- Puacb8gSQG4UAvr -vNopQ/1/index)))"
2 = " (store (mmapfs (/V/idea/ nodes/0/ indices/-Puacb8gSQG4UAvr-vN0opQ/2/index)))"
3 = " (store (mmapfs (/V/ idea/ nodes/0/ indices/ -Puacb8gSQG4UAvr-vNopQ/3/ index)))"
4 ="(store (mmapfs (/V/idea/ nodes/0/ indices/-Puacb8gSQG4UAvr-vN0pQ/4/index)))"

target值如下:

store(mmapfs (/Volumes/RamDisk/idea/nodes/0/indices/Dcfi3m9kTW2Dfc2zUjMOoQ/0/index))

关注我的公众号【宝哥大数据】

以上是关于Elasticsearch源码分析-索引分析(一)的主要内容,如果未能解决你的问题,请参考以下文章

《Elasticsearch 源码解析与优化实战》第17章:Shrink原理分析

ElasticSearch Index操作源码分析

《Elasticsearch 源码解析与优化实战》第10章:索引恢复流程分析

《Elasticsearch 源码解析与优化实战》第10章:索引恢复流程分析

Elasticsearch之源码分析(shard分片规则)

《Elasticsearch 源码解析与优化实战》第11章:gateway 模块分析