Es7.x使用RestHighLevelClient进行增删改和批量操作

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Es7.x使用RestHighLevelClient进行增删改和批量操作相关的知识,希望对你有一定的参考价值。

参考技术A 整个项目可以共用一个BulkProcessor,可以配置多种刷新策略,将数据由内存刷新到es中。

当设置 OpType.CREATE 时相同id插入异常看出,es进行了乐观锁控制并发写冲突。

由于设置了BulkProcessor对象,可以将数据设置到 BulkProcessor 对象中,根据策略批量的刷新到Es中。

更新操作传入的doc为map对象,而不是json字符串,否则会抛出异常。

ES实战ES6.X Join

ES6.X Join

文章目录

1、什么是join

join 属于mappingField数据类型中一种特殊字段。

2、join可以用来干什么?

可在相同索引的文档中创建父/子关系。 关系部分在文档中定义了一组可能的关系,每个关系都是父名称和子名称。

3、如何使用join?

在6.X的ES中可以在新建index的时候 设置join字段。

curl -X PUT "localhost:9200/my_index?pretty" -H 'Content-Type: application/json' -d'

  "mappings": 
    "_doc": 
      "properties": 
        "my_join_field":  
          "type": "join",
          "relations": 
            "question": "answer" 
          
        
      
    
  

'

my_join_fieldjoin的名称。relations 中的question代表answer的父。

在写入文档时, 分为父文档 和 子文档。

写入父文档方式一:

curl -X PUT "localhost:9200/my_index/_doc/1?refresh&pretty" -H 'Content-Type: application/json' -d'
   
     "text": "This is a question",
     "my_join_field": 
       "name": "question" 
     
   
   '

写入父文档方式二:

curl -X PUT "localhost:9200/my_index/_doc/1?refresh&pretty" -H 'Content-Type: application/json' -d'

  "text": "This is a question",
  "my_join_field": "question" 

'

方式一与方式二的区别可以理解为父文档索引是可以对join字段名的简写,直接去掉name。

写入子文档有要求:

curl -X PUT "localhost:9200/my_index/_doc/3?routing=1&refresh&pretty" -H 'Content-Type: application/json' -d'

  "text": "This is an answer",
  "my_join_field": 
    "name": "answer", 
    "parent": "1"
  

'

curl -X PUT "localhost:9200/my_index/_doc/4?routing=1&refresh&pretty" -H 'Content-Type: application/json' -d'

  "text": "This is another answer",
  "my_join_field": 
    "name": "answer",
    "parent": "1"
  

'

注意:

  1. 路由值是强制性的,因为父子文档必须在同一分片上建立索引
  2. answer 是此子文档的加入名称。
  3. 指定此子文档的父文档ID:1。

4、join的使用约束

  • 每个索引仅允许一个join类型的mapping定义。
  • 父文档和子文档必须在同一分片上建立索引。 这意味着在获取,删除或更新子文档时需要提供相同的路由值。
  • 一个文档可以有多个子文档,但只能有一个父文档。
  • 可以向已经存在的join类型添加新的关系。
  • 当一个文档是父文档之后 也可以将子文档添加到其中。

5、join类型的检索与聚合

5.1 全量检索

curl -X GET "localhost:9200/my_index/_search?pretty" -H 'Content-Type: application/json' -d'

  "query": 
    "match_all": 
  ,
  "sort": ["_id"]

'

返回值:


  "took": 1,
  "timed_out": false,
  "_shards": 
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  ,
  "hits": 
    "total": 4,
    "max_score": null,
    "hits": [
      
        "_index": "my_join_index",
        "_type": "_doc",
        "_id": "1",
        "_score": null,
        "_source": 
          "text": "This is a question",
          "my_join_field": "question"
        ,
        "sort": [
          "1"
        ]
      ,
      
        "_index": "my_join_index",
        "_type": "_doc",
        "_id": "2",
        "_score": null,
        "_source": 
          "text": "This is another question",
          "my_join_field": "question"
        ,
        "sort": [
          "2"
        ]
      ,
      
        "_index": "my_join_index",
        "_type": "_doc",
        "_id": "3",
        "_score": null,
        "_routing": "1",
        "_source": 
          "text": "This is an answer",
          "my_join_field": 
            "name": "answer",
            "parent": "1" 
        ,
        "sort": [
          "3"
        ]
      ,
      
        "_index": "my_join_index",
        "_type": "_doc",
        "_id": "4",
        "_score": null,
        "_routing": "1",
        "_source": 
          "text": "This is another answer",
          "my_join_field": 
            "name": "answer",
            "parent": "1" 
        ,
        "sort": [
          "4"
        ]
      
    ]
  

5.2 由父文档找子文档

GET my_index/_search

    "query": 
        "has_parent" : 
            "parent_type" : "question",
            "query" : 
                "match_all": 
            
        
    

返回结果:


    "took":0,
    "timed_out":false,
    "_shards":
        "total":1,
        "successful":1,
        "skipped":0,
        "failed":0
    ,
    "hits":
        "total":2,
        "max_score":1,
        "hits":[
            
                "_index":"child_example",
                "_type":"_doc",
                "_id":"2",
                "_score":1,
                "_routing":"1",
                "_source":
                    "join":
                        "name":"answer",
                        "parent":"1"
                    ,
                    "owner":
                        "location":"Norfolk, United Kingdom",
                        "display_name":"Sam",
                        "id":48
                    ,
                    "body":"<p>Unfortunately you're pretty much limited to FTP...",
                    "creation_date":"2009-05-04T13:45:37.030"
                
            ,
            
                "_index":"child_example",
                "_type":"_doc",
                "_id":"3",
                "_score":1,
                "_routing":"1",
                "_source":
                    "join":
                        "name":"answer",
                        "parent":"1"
                    ,
                    "owner":
                        "location":"Norfolk, United Kingdom",
                        "display_name":"Troll",
                        "id":49
                    ,
                    "body":"<p>Use Linux...",
                    "creation_date":"2009-05-05T13:45:37.030"
                
            
        ]
    

5.3 基于子文档找父文档

GET my_index/_search

    "query":
        "has_child":
            "query":
                "match_all":
                    "boost":1
                
            ,
            "type":"answer",
            "score_mode":"none",
            "min_children":0,
            "max_children":2147483647,
            "ignore_unmapped":false,
            "boost":1
        
    

返回结果


    "took":0,
    "timed_out":false,
    "_shards":
        "total":1,
        "successful":1,
        "skipped":0,
        "failed":0
    ,
    "hits":
        "total":1,
        "max_score":1,
        "hits":[
            
                "_index":"child_example",
                "_type":"_doc",
                "_id":"1",
                "_score":1,
                "_source":
                    "join":
                        "name":"question"
                    ,
                    "body":"<p>I have Windows 2003 server and i bought a new Windows 2008 server...",
                    "title":"Whats the best way to file transfer my site from server to a newer one?",
                    "tags":[
                        "windows-server-2003",
                        "windows-server-2008",
                        "file-transfer"
                    ]
                
            
        ]
    

5.4 聚合

GET my_index/_search

  "query": 
    "parent_id":  
      "type": "answer",
      "id": "1"
    
  ,
  "aggs": 
    "parents": 
      "terms": 
        "field": "join#question", 
        "size": 10
      
    
  ,
  "script_fields": 
    "parent": 
      "script": 
         "source": "doc['join#question']" 
      
    
  

返回


    "took":3,
    "timed_out":false,
    "_shards":
        "total":1,
        "successful":1,
        "skipped":0,
        "failed":0
    ,
    "hits":
        "total":2,
        "max_score":0.13353139,
        "hits":[
            
                "_index":"child_example",
                "_type":"_doc",
                "_id":"2",
                "_score":0.13353139,
                "_routing":"1",
                "fields":
                    "parent":[
                        "1"
                    ]
                
            ,
            
                "_index":"child_example",
                "_type":"_doc",
                "_id":"3",
                "_score":0.13353139,
                "_routing":"1",
                "fields":
                    "parent":[
                        "1"
                    ]
                
            
        ]
    ,
    "aggregations":
        "sterms#parents":
            "doc_count_error_upper_bound":0,
            "sum_other_doc_count":0,
            "buckets":[
                
                    "key":"1",
                    "doc_count":2
                
            ]
        
    

6、join的1对多

如下,一个父文档question与多个子文档answer,comment的映射定义。

PUT join_ext_index

  "mappings": 
    "_doc": 
      "properties": 
        "my_join_field": 
          "type": "join",
          "relations": 
            "question": ["answer", "comment"]  
          
        
      
    
  

7、join的1对多对多

PUT my_index

  "mappings": 
    "_doc": 
      "properties": 
        "my_join_field": 
          "type": "join",
          "relations": 
            "question": ["answer", "comment"],  
            "answer": "vote" 
          
        
      
    
  

实现关系如下

   question
    /    \\
   /      \\
comment  answer
           |
           |
          vote

向孙子文档写数据

PUT join_multi_index/_doc/3?routing=1&refresh 

  "text": "This is a vote",
  "my_join_field": 
    "name": "vote",
    "parent": "2" 
  

注意:

  • 孙子文档所在分片必须与其父母和祖父母相同
  • 孙子文档的父的主键号(必须指向其父亲answer文档)

8、join的search具体实现。

在使用Java High Level REST Client的时候可以使用**HasChildQueryBuilder,HasParentQueryBuilderParentIdQueryBuilder**来实现对join类型的检索,

 QueryBuilder qb = JoinQueryBuilders.hasParentQuery(
                    "question",
                    matchAllQuery(),
                    true);
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchRequest.indices("child_example");
searchSourceBuilder.query(qb);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = ClientV672.getStringRestHighClients().get(clusterNameV6_7_2)
                    .search(searchRequest, RequestOptions.DEFAULT);
            QueryBuilder qb = JoinQueryBuilders.hasChildQuery(
                    "answer",
                    matchAllQuery(),
                    ScoreMode.None);
            SearchRequest searchRequest = new SearchRequest();
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchRequest.indices("child_example");
            searchSourceBuilder.query(qb);
            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponse = ClientV672.getStringRestHighClients().get(clusterNameV6_7_2)
                    .search(searchRequest, RequestOptions.DEFAULT);

服务端执行的search过程中的QueryPhase的阶段中的executeQueryPhase方法下

类SearchService

private void parseSource(DefaultSearchContext context, SearchSourceBuilder source) throws SearchContextException 
    ...
        
    if (source.query() != null) 
            InnerHitContextBuilder.extractInnerHits(source.query(), innerHitBuilders);
            // 由queryShardContext.toQuery(source.query())对Query进行重写
            context.parsedQuery(queryShardContext.toQuery(source.query()));
        
    
    ...
    

 public ParsedQuery toQuery(QueryBuilder queryBuilder) 
        return toQuery(queryBuilder, q -> 
            Query query = q.toQuery(this);
            if (query == null) 
                query = Queries.newMatchNoDocsQuery("No query left after rewrite.");
            
            return query;
        );
    

AbstractQueryBuilder

    @Override
    public final Query toQuery(QueryShardContext context) throws IOException 
        // 
        Query query = doToQuery(context);
        if (query != null) 
            if (boost != DEFAULT_BOOST) 
                if (query instanceof SpanQuery) 
                    query = new SpanBoostQuery((SpanQuery) query, boost);
                 else 
                    query = new BoostQuery(query, boost);
                
            
            if (queryName != null) 
                context.addNamedQuery(queryName, query);
            
        
        return query;
    

HasParentQueryBuilder,HasChildQueryBuilder都集成了AbstractQueryBuilder。复写了doToQuery方法

    @Override
    protected Query doToQuery(QueryShardContext context) throws IOException 
        // 检查索引是不是单type
        if (context.getIndexSettings().isSingleType()) 
            return joinFieldDoToQuery(context);
         else 
            return parentFieldDoToQuery(context);
        
    

HasChildQueryBuilder下的joinFieldDoToQuery

    private Query joinFieldDoToQuery(QueryShardContext context) throws IOException 
        ParentJoinFieldMapper joinFieldMapper = ParentJoinFieldMapper.getMapper(context.getMapperService());
        if (joinFieldMapper == null) 
            if (ignoreUnmapped) 
                return new MatchNoDocsQuery();
             else 
                throw new QueryShardException(context, "[" + NAME + "] no join field has been configured");
            
        

        ParentIdFieldMapper parentIdFieldMapper = joinFieldMapper.getParentIdFieldMapper(type, false);
        if (parentIdFieldMapper != null) 
            Query parentFilter = parentIdFieldMapper.getParentFilter();
            Query childFilter = parentIdFieldMapper.getChildFilter(type);
            Query innerQuery = Queries.filtered(query.toQuery(context), childFilter);
            MappedFieldType fieldType = parentIdFieldMapper.fieldType();
            final SortedSetDVOrdinalsIndexFieldData fieldData = context.getForField(fieldType);
            Es7.x使用RestHighLevelClient进行增删改和批量操作

es7.x—查询篇

Es7.x使用RestHighLevelClient进行增删改和批量操作

Elasticsearch - Java API 操作 ES7.15.0ES7.x 索引,文档;高级搜索

Es7.x使用RestHighLevelClient的3种分页实现

Elasticsearch - Java API 操作 ES7.16.0+ES8.x 索引,文档;高级搜索