深入elasticsearch源码之索引过程
Posted 洽洽老大
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入elasticsearch源码之索引过程相关的知识,希望对你有一定的参考价值。
调用es 2..2.1 的 java Api在ES集群中索引一个文档
客户端大致流程:
- 使用
XContentBuilder
构建索引的json串,也可直接用json字符串 - 使用
TransportClient
连接ES集群 - 发送索引到集群并获取
IndexResponse
测试代码如下:
package index;
import java.io.IOException;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.json.JsonXContentGenerator;
import com.fasterxml.jackson.core.filter.JsonPointerBasedFilter;
import es.MyTransportClient;
public class MyIndex
public static void main(String[] args)
TransportClient client = MyTransportClient.getInstance().getTransportClient();
/**
* 直接构造json
*/
// IndexResponse response = client.prepareIndex("library","book","1")
// .setSource("\\"title\\":\\"mastering elasticsearch\\"")
// .execute().actionGet();
/**
* 代码构造json
*/
XContentBuilder builder;
try
builder = JsonXContent.contentBuilder().startObject().field("user", "qiaqia")
.field("title", "this is title")
.field("subtitle", new String[] "title1", "title2", "title3" ).endObject();
IndexResponse response = client.prepareIndex("library", "book", "4").setSource(builder)
.get();
System.out.println(response.toString());
System.out.println(response.isCreated());
System.out.println(response.getVersion());
/*
* 在es存储的结果如下:
"_index": "library",
"_type": "book",
"_id": "5",
"_score": 1,
"_source":
"user": "qiaqia",
"title": "this is title",
"subtitle": [
"title1"
,
"title2"
,
"title3"
]
*/
catch (IOException e)
e.printStackTrace();
构建好XContent
后,生成IndexRequest
,IndexRequest
封装了索引的操作,索引内容,路由,索引类型,id, 时间戳,版本号,超时,ttl等等信息
然后将IndexRequest
由TransportService
通过tcp发送到集群,TransportService
封装了异步,事件驱动的高性能网络应用程序框架Netty
服务端流程
获取到TransportAction
后,读取集群状态,确定数据分配到哪个分片上。
把请求提交到主分片处理,可查看TransportIndexAction
protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(MetaData metaData, IndexRequest request) throws Throwable
// validate, if routing is required, that we got routing
IndexMetaData indexMetaData = metaData.index(request.shardId().getIndex());
MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
if (mappingMd != null && mappingMd.routing().required())
if (request.routing() == null)
throw new RoutingMissingException(request.shardId().getIndex(), request.type(), request.id());
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.shardSafe(request.shardId().id());
final WriteResult<IndexResponse> result = executeIndexRequestOnPrimary(null, request, indexShard, mappingUpdatedAction);
final IndexResponse response = result.response;
final Translog.Location location = result.location;
processAfterWrite(request.refresh(), indexShard, location);
return new Tuple<>(response, request);//返回操作后的IndexResponse
执行索引写入前,TransportIndexAction
public static Engine.IndexingOperation prepareIndexOperationOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard)
/**
将IndexRequest中的数据解析出来
*/
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).index(request.index()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
boolean canHaveDuplicates = request.canHaveDuplicates();
if (shardRequest != null)
canHaveDuplicates |= shardRequest.canHaveDuplicates();
/**
判断是索引还是创建,当opType是Index时,如果文档id存在,更新文档,否则创建文档
当opType是Create,如果文档id存在,抛出文档存在的错误
*/
if (request.opType() == IndexRequest.OpType.INDEX)
return indexShard.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), canHaveDuplicates);
else
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
return indexShard.prepareCreateOnPrimary(sourceToParse, request.version(), request.versionType(), canHaveDuplicates, canHaveDuplicates);//调用indexShard对Lucene进行操作
在IndexShard
中,
public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType, boolean canHaveDuplicates)
try
if (shardRouting.primary() == false)
throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary");
return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.PRIMARY, state !=
IndexShardState.STARTED || canHaveDuplicates);
catch (Throwable t)
verifyNotClosed(t);
throw t;
static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse source, long version, VersionType versionType, Engine
.Operation.Origin origin, boolean canHaveDuplicates)
long startTime = System.nanoTime();
/**
解析json为ParsedDocument
*/
ParsedDocument doc = docMapper.getDocumentMapper().parse(source);
if (docMapper.getMapping() != null)
doc.addDynamicMappingsUpdate(docMapper.getMapping());
//写入Lucene
return new Engine.Index(docMapper.getDocumentMapper().uidMapper().term(doc.uid().stringValue()), doc, version, versionType,
origin, startTime, canHaveDuplicates);
有关如何跟Lucene底层进行数据交互的问题,由于本人刚入门ES,也没读过Lucene源码,所以等以后有时间再补充好了。
以上是关于深入elasticsearch源码之索引过程的主要内容,如果未能解决你的问题,请参考以下文章