Es7.x使用RestHighLevelClient进行增删改和批量操作
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Es7.x使用RestHighLevelClient进行增删改和批量操作相关的知识,希望对你有一定的参考价值。
参考技术A 整个项目可以共用一个BulkProcessor,可以配置多种刷新策略,将数据由内存刷新到es中。当设置 OpType.CREATE 时相同id插入异常看出,es进行了乐观锁控制并发写冲突。
由于设置了BulkProcessor对象,可以将数据设置到 BulkProcessor 对象中,根据策略批量的刷新到Es中。
更新操作传入的doc为map对象,而不是json字符串,否则会抛出异常。
ES实战ES6.X Join
ES6.X Join
文章目录
1、什么是join
join
属于mapping
中Field
数据类型中一种特殊字段。
2、join可以用来干什么?
可在相同索引的文档中创建父/子关系。 关系部分在文档中定义了一组可能的关系,每个关系都是父名称和子名称。
3、如何使用join?
在6.X的ES中可以在新建index的时候 设置join字段。
curl -X PUT "localhost:9200/my_index?pretty" -H 'Content-Type: application/json' -d'
"mappings":
"_doc":
"properties":
"my_join_field":
"type": "join",
"relations":
"question": "answer"
'
my_join_field
为join
的名称。relations
中的question
代表answer
的父。
在写入文档时, 分为父文档 和 子文档。
写入父文档方式一:
curl -X PUT "localhost:9200/my_index/_doc/1?refresh&pretty" -H 'Content-Type: application/json' -d'
"text": "This is a question",
"my_join_field":
"name": "question"
'
写入父文档方式二:
curl -X PUT "localhost:9200/my_index/_doc/1?refresh&pretty" -H 'Content-Type: application/json' -d'
"text": "This is a question",
"my_join_field": "question"
'
方式一与方式二的区别可以理解为父文档索引是可以对join字段名的简写,直接去掉name。
写入子文档有要求:
curl -X PUT "localhost:9200/my_index/_doc/3?routing=1&refresh&pretty" -H 'Content-Type: application/json' -d'
"text": "This is an answer",
"my_join_field":
"name": "answer",
"parent": "1"
'
curl -X PUT "localhost:9200/my_index/_doc/4?routing=1&refresh&pretty" -H 'Content-Type: application/json' -d'
"text": "This is another answer",
"my_join_field":
"name": "answer",
"parent": "1"
'
注意:
- 路由值是强制性的,因为父子文档必须在同一分片上建立索引
answer
是此子文档的加入名称。- 指定此子文档的父文档ID:1。
4、join的使用约束
- 每个索引仅允许一个join类型的mapping定义。
- 父文档和子文档必须在同一分片上建立索引。 这意味着在获取,删除或更新子文档时需要提供相同的路由值。
- 一个文档可以有多个子文档,但只能有一个父文档。
- 可以向已经存在的join类型添加新的关系。
- 当一个文档是父文档之后 也可以将子文档添加到其中。
5、join类型的检索与聚合
5.1 全量检索
curl -X GET "localhost:9200/my_index/_search?pretty" -H 'Content-Type: application/json' -d'
"query":
"match_all":
,
"sort": ["_id"]
'
返回值:
"took": 1,
"timed_out": false,
"_shards":
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
,
"hits":
"total": 4,
"max_score": null,
"hits": [
"_index": "my_join_index",
"_type": "_doc",
"_id": "1",
"_score": null,
"_source":
"text": "This is a question",
"my_join_field": "question"
,
"sort": [
"1"
]
,
"_index": "my_join_index",
"_type": "_doc",
"_id": "2",
"_score": null,
"_source":
"text": "This is another question",
"my_join_field": "question"
,
"sort": [
"2"
]
,
"_index": "my_join_index",
"_type": "_doc",
"_id": "3",
"_score": null,
"_routing": "1",
"_source":
"text": "This is an answer",
"my_join_field":
"name": "answer",
"parent": "1"
,
"sort": [
"3"
]
,
"_index": "my_join_index",
"_type": "_doc",
"_id": "4",
"_score": null,
"_routing": "1",
"_source":
"text": "This is another answer",
"my_join_field":
"name": "answer",
"parent": "1"
,
"sort": [
"4"
]
]
5.2 由父文档找子文档
GET my_index/_search
"query":
"has_parent" :
"parent_type" : "question",
"query" :
"match_all":
返回结果:
"took":0,
"timed_out":false,
"_shards":
"total":1,
"successful":1,
"skipped":0,
"failed":0
,
"hits":
"total":2,
"max_score":1,
"hits":[
"_index":"child_example",
"_type":"_doc",
"_id":"2",
"_score":1,
"_routing":"1",
"_source":
"join":
"name":"answer",
"parent":"1"
,
"owner":
"location":"Norfolk, United Kingdom",
"display_name":"Sam",
"id":48
,
"body":"<p>Unfortunately you're pretty much limited to FTP...",
"creation_date":"2009-05-04T13:45:37.030"
,
"_index":"child_example",
"_type":"_doc",
"_id":"3",
"_score":1,
"_routing":"1",
"_source":
"join":
"name":"answer",
"parent":"1"
,
"owner":
"location":"Norfolk, United Kingdom",
"display_name":"Troll",
"id":49
,
"body":"<p>Use Linux...",
"creation_date":"2009-05-05T13:45:37.030"
]
5.3 基于子文档找父文档
GET my_index/_search
"query":
"has_child":
"query":
"match_all":
"boost":1
,
"type":"answer",
"score_mode":"none",
"min_children":0,
"max_children":2147483647,
"ignore_unmapped":false,
"boost":1
返回结果
"took":0,
"timed_out":false,
"_shards":
"total":1,
"successful":1,
"skipped":0,
"failed":0
,
"hits":
"total":1,
"max_score":1,
"hits":[
"_index":"child_example",
"_type":"_doc",
"_id":"1",
"_score":1,
"_source":
"join":
"name":"question"
,
"body":"<p>I have Windows 2003 server and i bought a new Windows 2008 server...",
"title":"Whats the best way to file transfer my site from server to a newer one?",
"tags":[
"windows-server-2003",
"windows-server-2008",
"file-transfer"
]
]
5.4 聚合
GET my_index/_search
"query":
"parent_id":
"type": "answer",
"id": "1"
,
"aggs":
"parents":
"terms":
"field": "join#question",
"size": 10
,
"script_fields":
"parent":
"script":
"source": "doc['join#question']"
返回
"took":3,
"timed_out":false,
"_shards":
"total":1,
"successful":1,
"skipped":0,
"failed":0
,
"hits":
"total":2,
"max_score":0.13353139,
"hits":[
"_index":"child_example",
"_type":"_doc",
"_id":"2",
"_score":0.13353139,
"_routing":"1",
"fields":
"parent":[
"1"
]
,
"_index":"child_example",
"_type":"_doc",
"_id":"3",
"_score":0.13353139,
"_routing":"1",
"fields":
"parent":[
"1"
]
]
,
"aggregations":
"sterms#parents":
"doc_count_error_upper_bound":0,
"sum_other_doc_count":0,
"buckets":[
"key":"1",
"doc_count":2
]
6、join的1对多
如下,一个父文档question与多个子文档answer,comment的映射定义。
PUT join_ext_index
"mappings":
"_doc":
"properties":
"my_join_field":
"type": "join",
"relations":
"question": ["answer", "comment"]
7、join的1对多对多
PUT my_index
"mappings":
"_doc":
"properties":
"my_join_field":
"type": "join",
"relations":
"question": ["answer", "comment"],
"answer": "vote"
实现关系如下
question
/ \\
/ \\
comment answer
|
|
vote
向孙子文档写数据
PUT join_multi_index/_doc/3?routing=1&refresh
"text": "This is a vote",
"my_join_field":
"name": "vote",
"parent": "2"
注意:
- 孙子文档所在分片必须与其父母和祖父母相同
- 孙子文档的父的主键号(必须指向其父亲answer文档)
8、join的search具体实现。
在使用Java High Level REST Client的时候可以使用**HasChildQueryBuilder
,HasParentQueryBuilder
和ParentIdQueryBuilder
**来实现对join类型的检索,
QueryBuilder qb = JoinQueryBuilders.hasParentQuery(
"question",
matchAllQuery(),
true);
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchRequest.indices("child_example");
searchSourceBuilder.query(qb);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = ClientV672.getStringRestHighClients().get(clusterNameV6_7_2)
.search(searchRequest, RequestOptions.DEFAULT);
QueryBuilder qb = JoinQueryBuilders.hasChildQuery(
"answer",
matchAllQuery(),
ScoreMode.None);
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchRequest.indices("child_example");
searchSourceBuilder.query(qb);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = ClientV672.getStringRestHighClients().get(clusterNameV6_7_2)
.search(searchRequest, RequestOptions.DEFAULT);
服务端执行的search过程中的QueryPhase的阶段中的executeQueryPhase方法下
类SearchService
private void parseSource(DefaultSearchContext context, SearchSourceBuilder source) throws SearchContextException
...
if (source.query() != null)
InnerHitContextBuilder.extractInnerHits(source.query(), innerHitBuilders);
// 由queryShardContext.toQuery(source.query())对Query进行重写
context.parsedQuery(queryShardContext.toQuery(source.query()));
...
public ParsedQuery toQuery(QueryBuilder queryBuilder)
return toQuery(queryBuilder, q ->
Query query = q.toQuery(this);
if (query == null)
query = Queries.newMatchNoDocsQuery("No query left after rewrite.");
return query;
);
AbstractQueryBuilder
@Override
public final Query toQuery(QueryShardContext context) throws IOException
//
Query query = doToQuery(context);
if (query != null)
if (boost != DEFAULT_BOOST)
if (query instanceof SpanQuery)
query = new SpanBoostQuery((SpanQuery) query, boost);
else
query = new BoostQuery(query, boost);
if (queryName != null)
context.addNamedQuery(queryName, query);
return query;
HasParentQueryBuilder,HasChildQueryBuilder都集成了AbstractQueryBuilder。复写了doToQuery
方法
@Override
protected Query doToQuery(QueryShardContext context) throws IOException
// 检查索引是不是单type
if (context.getIndexSettings().isSingleType())
return joinFieldDoToQuery(context);
else
return parentFieldDoToQuery(context);
HasChildQueryBuilder下的joinFieldDoToQuery
private Query joinFieldDoToQuery(QueryShardContext context) throws IOException
ParentJoinFieldMapper joinFieldMapper = ParentJoinFieldMapper.getMapper(context.getMapperService());
if (joinFieldMapper == null)
if (ignoreUnmapped)
return new MatchNoDocsQuery();
else
throw new QueryShardException(context, "[" + NAME + "] no join field has been configured");
ParentIdFieldMapper parentIdFieldMapper = joinFieldMapper.getParentIdFieldMapper(type, false);
if (parentIdFieldMapper != null)
Query parentFilter = parentIdFieldMapper.getParentFilter();
Query childFilter = parentIdFieldMapper.getChildFilter(type);
Query innerQuery = Queries.filtered(query.toQuery(context), childFilter);
MappedFieldType fieldType = parentIdFieldMapper.fieldType();
final SortedSetDVOrdinalsIndexFieldData fieldData = context.getForField(fieldType);
Es7.x使用RestHighLevelClient进行增删改和批量操作
Es7.x使用RestHighLevelClient进行增删改和批量操作
Elasticsearch - Java API 操作 ES7.15.0ES7.x 索引,文档;高级搜索