深入elasticsearch源码之索引过程

Posted 洽洽老大

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入elasticsearch源码之索引过程相关的知识,希望对你有一定的参考价值。

调用es 2..2.1 的 java Api在ES集群中索引一个文档

客户端大致流程:

  1. 使用XContentBuilder构建索引的json串,也可直接用json字符串
  2. 使用TransportClient连接ES集群
  3. 发送索引到集群并获取IndexResponse

测试代码如下:

package index;

import java.io.IOException;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.json.JsonXContentGenerator;
import com.fasterxml.jackson.core.filter.JsonPointerBasedFilter;
import es.MyTransportClient;

public class MyIndex 

    public static void main(String[] args) 
        TransportClient client = MyTransportClient.getInstance().getTransportClient();
        /**
         * 直接构造json
         */
        // IndexResponse response = client.prepareIndex("library","book","1")
        // .setSource("\\"title\\":\\"mastering elasticsearch\\"")
        // .execute().actionGet();
        /**
         * 代码构造json
         */
        XContentBuilder builder;
        try 
            builder = JsonXContent.contentBuilder().startObject().field("user", "qiaqia")
                    .field("title", "this is title")
                    .field("subtitle", new String[]  "title1", "title2", "title3" ).endObject();
            IndexResponse response = client.prepareIndex("library", "book", "4").setSource(builder)
                    .get();
            System.out.println(response.toString());
            System.out.println(response.isCreated());
            System.out.println(response.getVersion());

            /*
             * 在es存储的结果如下:
             
                "_index": "library",
                "_type": "book",
                "_id": "5",
                "_score": 1,
                "_source": 
                        "user": "qiaqia",
                        "title": "this is title",
                        "subtitle": [
                            "title1"
                            ,
                            "title2"
                            ,
                            "title3"
                        ]
                    
                
             */

         catch (IOException e) 
            e.printStackTrace();
        

    


构建好XContent后,生成IndexRequest,IndexRequest封装了索引的操作,索引内容,路由,索引类型,id, 时间戳,版本号,超时,ttl等等信息
然后将IndexRequestTransportService通过tcp发送到集群,TransportService封装了异步,事件驱动的高性能网络应用程序框架Netty

服务端流程

获取到TransportAction后,读取集群状态,确定数据分配到哪个分片上。

把请求提交到主分片处理,可查看TransportIndexAction

  protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(MetaData metaData, IndexRequest request) throws Throwable 

        // validate, if routing is required, that we got routing
        IndexMetaData indexMetaData = metaData.index(request.shardId().getIndex());
        MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
        if (mappingMd != null && mappingMd.routing().required()) 
            if (request.routing() == null) 
                throw new RoutingMissingException(request.shardId().getIndex(), request.type(), request.id());
            
        

        IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
        IndexShard indexShard = indexService.shardSafe(request.shardId().id());

        final WriteResult<IndexResponse> result = executeIndexRequestOnPrimary(null, request, indexShard, mappingUpdatedAction);
        final IndexResponse response = result.response;
        final Translog.Location location = result.location;
        processAfterWrite(request.refresh(), indexShard, location);
        return new Tuple<>(response, request);//返回操作后的IndexResponse
    

执行索引写入前,TransportIndexAction

public static Engine.IndexingOperation prepareIndexOperationOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) 
/**
 将IndexRequest中的数据解析出来
*/
        SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).index(request.index()).type(request.type()).id(request.id())
            .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
        boolean canHaveDuplicates = request.canHaveDuplicates();
        if (shardRequest != null) 
            canHaveDuplicates |= shardRequest.canHaveDuplicates();
        
    /**
          判断是索引还是创建,当opType是Index时,如果文档id存在,更新文档,否则创建文档
          当opType是Create,如果文档id存在,抛出文档存在的错误
    */
        if (request.opType() == IndexRequest.OpType.INDEX) 
            return indexShard.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), canHaveDuplicates);
         else 
            assert request.opType() == IndexRequest.OpType.CREATE : request.opType();

            return indexShard.prepareCreateOnPrimary(sourceToParse, request.version(), request.versionType(), canHaveDuplicates, canHaveDuplicates);//调用indexShard对Lucene进行操作
        
    

IndexShard中,

public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType, boolean canHaveDuplicates) 
        try 
            if (shardRouting.primary() == false) 
                throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary");
            
            return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.PRIMARY, state !=
                    IndexShardState.STARTED || canHaveDuplicates);
         catch (Throwable t) 
            verifyNotClosed(t);
            throw t;
        
    
static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse source, long version, VersionType versionType, Engine
            .Operation.Origin origin, boolean canHaveDuplicates) 
        long startTime = System.nanoTime();
        /**
       解析json为ParsedDocument
    */
        ParsedDocument doc = docMapper.getDocumentMapper().parse(source);
        if (docMapper.getMapping() != null) 
            doc.addDynamicMappingsUpdate(docMapper.getMapping());
        
        //写入Lucene
        return new Engine.Index(docMapper.getDocumentMapper().uidMapper().term(doc.uid().stringValue()), doc, version, versionType,
                origin, startTime, canHaveDuplicates);
    

有关如何跟Lucene底层进行数据交互的问题,由于本人刚入门ES,也没读过Lucene源码,所以等以后有时间再补充好了。

以上是关于深入elasticsearch源码之索引过程的主要内容,如果未能解决你的问题,请参考以下文章

深入Elasticsearch:索引的创建

深入Elasticsearch:索引的创建

深入Elasticsearch:索引的创建

深入Elasticsearch:索引的创建

Elasticsearch之源码分析(shard分片规则)

elasticsearch index 之 create index(-)