使用Java High Level REST Client操作elasticsearch

Posted Boblim

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Java High Level REST Client操作elasticsearch相关的知识,希望对你有一定的参考价值。

说明

在明确了ES的基本概念和使用方法后,我们来学习如何使用ES的Java API.
本文假设你已经对ES的基本概念已经有了一个比较全面的认识。

客户端

你可以用Java客户端做很多事情:

  • 执行标准的index,get,delete,update,search等操作。
  • 在正在运行的集群上执行管理任务。

但是,通过官方文档可以得知,现在存在至少三种Java客户端。

  1. Transport Client
  2. Java High Level REST Client
  3. Java Low Level Rest Client

造成这种混乱的原因是:

  • 长久以来,ES并没有官方的Java客户端,并且Java自身是可以简单支持ES的API的,于是就先做成了TransportClient。但是TransportClient的缺点是显而易见的,它没有使用RESTful风格的接口,而是二进制的方式传输数据。

  • 之后ES官方推出了Java Low Level REST Client,它支持RESTful,用起来也不错。但是缺点也很明显,因为TransportClient的使用者把代码迁移到Low Level REST Client的工作量比较大。官方文档专门为迁移代码出了一堆文档来提供参考。

  • 现在ES官方推出Java High Level REST Client,它是基于Java Low Level REST Client的封装,并且API接收参数和返回值和TransportClient是一样的,使得代码迁移变得容易并且支持了RESTful的风格,兼容了这两种客户端的优点。当然缺点是存在的,就是版本的问题。ES的小版本更新非常频繁,在最理想的情况下,客户端的版本要和ES的版本一致(至少主版本号一致),次版本号不一致的话,基本操作也许可以,但是新API就不支持了。

  • 强烈建议ES5及其以后的版本使用Java High Level REST Client。笔者这里使用的是ES5.6.3,下面的文章将基于JDK1.8+Spring Boot+ES5.6.3 Java High Level REST Client+Maven进行示例。

前置条件:

  • JDK1.8
  • elasticsearch 6.3.2(其他版本未做测试,不保证完全兼容)
  • maven 
  • spring boot
  • 1.maven依赖:
        <!--elasticsearch base-->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.3.2</version>
        </dependency>
        <!-- Java Low Level REST Client -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>6.3.2</version>
        </dependency>
        <!-- Java High Level REST Client -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.3.2</version>
        </dependency>
  • 2.接入rest-higl-level-client
 1 import org.apache.http.HttpHost;
 2 import org.apache.http.auth.AuthScope;
 3 import org.apache.http.auth.UsernamePasswordCredentials;
 4 import org.apache.http.client.CredentialsProvider;
 5 import org.apache.http.impl.client.BasicCredentialsProvider;
 6 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
 7 import org.elasticsearch.client.RestClient;
 8 import org.elasticsearch.client.RestClientBuilder;
 9 import org.elasticsearch.client.RestHighLevelClient;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12 import org.springframework.beans.factory.DisposableBean;
13 import org.springframework.beans.factory.FactoryBean;
14 import org.springframework.beans.factory.InitializingBean;
15 import org.springframework.beans.factory.annotation.Value;
16 import org.springframework.context.annotation.Configuration;
17 
18 @Configuration
19 public class ElasticsearchConfiguration implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean {
20     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfiguration.class);
21 
22     @Value("${spring.data.elasticsearch.host}")
23     private String host;
24     @Value("${spring.data.elasticsearch.port}")
25     private int port;
26     @Value("${spring.data.elasticsearch.username}")
27     private String username;
28     @Value("${spring.data.elasticsearch.password}")
29     private String password;
30 
31     private RestHighLevelClient restHighLevelClient;
32 
33     @Override
34     public void destroy() throws Exception {
35         try {
36             LOGGER.info("Closing elasticSearch client");
37             if (restHighLevelClient != null) {
38                 restHighLevelClient.close();
39             }
40         } catch (final Exception e) {
41             LOGGER.error("Error closing ElasticSearch client: ", e);
42         }
43     }
44 
45     @Override
46     public RestHighLevelClient getObject() throws Exception {
47         return restHighLevelClient;
48     }
49 
50     @Override
51     public Class<RestHighLevelClient> getObjectType() {
52         return RestHighLevelClient.class;
53     }
54 
55     @Override
56     public boolean isSingleton() {
57         return false;
58     }
59 
60     @Override
61     public void afterPropertiesSet() throws Exception {
62         buildClient();
63     }
64 
65     protected void buildClient() {
66         final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
67         credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
68         RestClientBuilder builder = RestClient.builder(new HttpHost(host, port))
69                 .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
70                     @Override
71                     public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
72                         return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
73                     }
74                 });
75 
76         restHighLevelClient = new RestHighLevelClient(builder);
77     }
78 
79 }
  • 3.index api
1 Map<String, Object> jsonMap = new HashMap<>();
2 jsonMap.put("user", "laimailai");
3 jsonMap.put("postDate", new Date());
4 jsonMap.put("message", "trying out Elasticsearch");
5 IndexRequest indexRequest = new IndexRequest("index", "type", "1")
6         .source(jsonMap);
7 IndexResponse indexResponse = client.index(request);
  • 4.get api
1 GetRequest getRequest = new GetRequest(
2         "index",
3         "type",
4         "1");
5 GetResponse getResponse = client.get(request);
  • 5.update api
1 UpdateRequest request = new UpdateRequest(
2         "index",
3         "type",
4         "1");
5 UpdateResponse updateResponse = client.update(request);
  • 6.delete api
1 DeleteRequest request = new DeleteRequest(
2         "index",    
3         "type",     
4         "1");
  • 7.bulk api

之前的文档说明过,bulk接口是批量index/update/delete操作
在API中,只需要一个bulk request就可以完成一批请求。

 1 //1.bulk
 2 BulkRequest request = new BulkRequest();
 3 request.add(new IndexRequest("index", "type", "1")
 4         .source(XContentType.JSON, "field", "foo"));
 5 request.add(new IndexRequest("index", "type", "2")
 6         .source(XContentType.JSON, "field", "bar"));
 7 request.add(new IndexRequest("index", "type", "3")
 8         .source(XContentType.JSON, "field", "baz"));
 9 
10 //同步
11 BulkResponse bulkResponse = client.bulk(request);
12 
13 //异步
14 client.bulkAsync(request, new ActionListener<BulkResponse>() {
15     @Override
16     public void onResponse(BulkResponse bulkResponse) {
17 
18     }
19 
20     @Override
21     public void onFailure(Exception e) {
22 
23     }
24 });
  • 8.bulkprocessor 划重点!!!

BulkProcessor 简化bulk API的使用,并且使整个批量操作透明化。
BulkProcessor 的执行需要三部分组成:

  1. RestHighLevelClient :执行bulk请求并拿到响应对象。
  2. BulkProcessor.Listener:在执行bulk request之前、之后和当bulk response发生错误时调用。
  3. ThreadPool:bulk request在这个线程池中执行操作,这使得每个请求不会被挡住,在其他请求正在执行时,也可以接收新的请求。

示例代码:

 1 @Service
 2 public class ElasticSearchUtil {
 3     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchUtil.class);
 4 
 5     @Autowired
 6     private RestHighLevelClient restHighLevelClient;
 7 
 8     private BulkProcessor bulkProcessor;
 9 
10     @PostConstruct
11     public void init() {
12         BulkProcessor.Listener listener = new BulkProcessor.Listener() {
13             @Override
14             public void beforeBulk(long executionId, BulkRequest request) {
15                 //重写beforeBulk,在每次bulk request发出前执行,在这个方法里面可以知道在本次批量操作中有多少操作数
16                 int numberOfActions = request.numberOfActions();
17                 LOGGER.info("Executing bulk [{}] with {} requests", executionId, numberOfActions);
18             }
19 
20             @Override
21             public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
22                 //重写afterBulk方法,每次批量请求结束后执行,可以在这里知道是否有错误发生。
23                 if (response.hasFailures()) {
24                     LOGGER.error("Bulk [{}] executed with failures,response = {}", executionId, response.buildFailureMessage());
25                 } else {
26                     LOGGER.info("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
27                 }
28                 BulkItemResponse[] responses = response.getItems();
29             }
30 
31             @Override
32             public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
33                 //重写方法,如果发生错误就会调用。
34                 LOGGER.error("Failed to execute bulk", failure);
35             }
36         };
37 
38         //在这里调用build()方法构造bulkProcessor,在底层实际上是用了bulk的异步操作
39         BulkProcessor bulkProcessor = BulkProcessor.builder(restHighLevelClient::bulkAsync, listener)
40                 // 1000条数据请求执行一次bulk
41                 .setBulkActions(1000)
42                 // 5mb的数据刷新一次bulk
43                 .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
44                 // 并发请求数量, 0不并发, 1并发允许执行
45                 .setConcurrentRequests(0)
46                 // 固定1s必须刷新一次
47                 .setFlushInterval(TimeValue.timeValueSeconds(1L))
48                 // 重试5次,间隔1s
49                 .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 5))
50                 .build();
51         this.bulkProcessor = bulkProcessor;
52     }
53 
54     @PreDestroy
55     public void destroy() {
56         try {
57             bulkProcessor.awaitClose(30, TimeUnit.SECONDS);
58         } catch (InterruptedException e) {
59             LOGGER.error("Failed to close bulkProcessor", e);
60         }
61         LOGGER.info("bulkProcessor closed!");
62     }
63 
64     /**
65      * 修改
66      *
67      * @param request
68      * @throws IOException
69      */
70     public void update(UpdateRequest request) {
71         this.bulkProcessor.add(request);
72     }
73 
74     /**
75      * 新增
76      *
77      * @param request
78      */
79     public void insert(IndexRequest request) {
80         this.bulkProcessor.add(request);
81     }
82 }

bulkProcessor使用用例:

 1         //新建三个 index 请求
 2         IndexRequest one = new IndexRequest("posts", "doc", "1").
 3                 source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
 4         IndexRequest two = new IndexRequest("posts", "doc", "2")
 5                 .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
 6         IndexRequest three = new IndexRequest("posts", "doc", "3")
 7                 .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");
 8         //新的三条index请求加入到上面配置好的bulkProcessor里面。
 9         bulkProcessor.add(one);
10         bulkProcessor.add(two);
11         bulkProcessor.add(three);
12         // add many request here.
13         //bulkProcess必须被关闭才能使上面添加的操作生效
14         bulkProcessor.close(); //立即关闭
15         //关闭bulkProcess的两种方法:
16         try {
17             //2.调用awaitClose.
18             //简单来说,就是在规定的时间内,是否所有批量操作完成。全部完成,返回true,未完成返//回false
19             
20             boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
21             
22         } catch (InterruptedException e) {
23             // TODO Auto-generated catch block
24             e.printStackTrace();
25         }
  • 9.upsert api

update --当id不存在时将会抛出异常:

1 UpdateRequest request = new UpdateRequest(index, type, "1").doc(jsonMap);
2 UpdateResponse response = restHighLevelClient.update(request);

upsert--id不存在时就插入:

1 UpdateRequest request = new UpdateRequest(index, type, "1").doc(jsonMap).upsert(jsonMap);
2 UpdateResponse response = restHighLevelClient.update(request);
  • 10.search api

Search API提供了对文档的查询和聚合的查询。
它的基本形式:

1 SearchRequest searchRequest = new SearchRequest();  //构造search request .在这里无参,查询全部索引
2 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();//大多数查询参数要写在searchSourceBuilder里 
3 searchSourceBuilder.query(QueryBuilders.matchAllQuery());//增加match_all的条件。
1 SearchRequest searchRequest = new SearchRequest("posts"); //指定posts索引
2 searchRequest.types("doc"); //指定doc类型

使用SearchSourceBuilder

大多数的查询控制都可以使用SearchSourceBuilder实现。
举一个简单例子:

1 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); //构造一个默认配置的对象
2 sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy")); //设置查询
3 sourceBuilder.from(0); //设置从哪里开始
4 sourceBuilder.size(5); //每页5条
5 sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); //设置超时时间

配置好searchSourceBuilder后,将它传入searchRequest里:

1 SearchRequest searchRequest = new SearchRequest();
2 searchRequest.source(sourceBuilder);
1 //全量搜索
2 SearchRequest searchRequest = new SearchRequest();
3 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
4 searchSourceBuilder.query(QueryBuilders.matchAllQuery());
5 searchRequest.source(searchSourceBuilder);
6 SearchRequest searchRequest = new SearchRequest("index");
 1 //根据多个条件搜索
 2 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
 3 for (String id: ids) {
 4     TermQueryBuilder termQueryBuilder = new TermQueryBuilder("id", id);
 5     boolQueryBuilder.should(termQueryBuilder);
 6 }
 7 SearchRequest searchRequest = new SearchRequest(index);
 8 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
 9 searchSourceBuilder.query(boolQueryBuilder);
10 searchRequest.source(searchSourceBuilder);
11 SearchResponse response = null;
12     response = restHighLevelClient.search(searchRequest);
13 return response;
  • 11.search scroll api
 1 //scroll 分页搜索
 2 final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
 3 SearchRequest searchRequest = new SearchRequest("posts");
 4 searchRequest.scroll(scroll);
 5 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
 6 searchSourceBuilder.query(matchQuery("title", "Elasticsearch"));
 7 searchRequest.source(searchSourceBuilder);
 8 
 9 SearchResponse searchResponse = client.search(searchRequest);
10 String scrollId = searchResponse.getScrollId();
11 SearchHit[] searchHits = searchResponse.getHits().getHits();
12 
13 while (searchHits != null && searchHits.length > 0) {
14     SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
15     scrollRequest.scroll(scroll);
16     searchResponse = client.searchScroll(scrollRequest);
17     scrollId = searchResponse.getScrollId();
18     searchHits = searchResponse.getHits().getHits();
19 
20 }
21 
22 ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
23 clearScrollRequest.addScrollId(scrollId);
24 ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest);
25 boolean succeeded = clearScrollResponse.isSucceeded();
  • 12.排序

 

SearchSourceBuilder可以添加一种或多种SortBuilder。
有四种特殊的排序实现:

    • field
    • score
    • GeoDistance
    • scriptSortBuilder
1 sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); //按照score倒序排列
2 sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC));  //并且按照id正序排列
  • 13.过滤

默认情况下,searchRequest返回文档内容,与REST API一样,这里你可以重写search行为。例如,你可以完全关闭"_source"检索。

1 sourceBuilder.fetchSource(false);

该方法还接受一个或多个通配符模式的数组,以更细粒度地控制包含或排除哪些字段。

1 String[] includeFields = new String[] {"title", "user", "innerObject.*"};
2 String[] excludeFields = new String[] {"_type"};
3 sourceBuilder.fetchSource(includeFields, excludeFields);
  • 14.聚合

通过配置适当的 AggregationBuilder ,再将它传入SearchSourceBuilder里,就可以完成聚合请求了。
之前的文档里面,我们通过下面这条命令,导入了一千条account信息:

curl -H "Content-Type: application/json" -XPOST ‘localhost:9200/bank/account/_bulk?pretty&refresh‘ --data-binary "@accounts.json"

随后,我们介绍了如何通过聚合请求进行分组:

GET /bank/_search?pretty
{
  "size": 0,
  "aggs": {
    "group_by_state": {
      "terms": {
        "field": "state.keyword"
      }
    }
  }
}

我们将这一千条数据根据state字段分组,得到响应:

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 999,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "group_by_state": {
      "doc_count_error_upper_bound": 20,
      "sum_other_doc_count": 770,
      "buckets": [
        {
          "key": "ID",
          "doc_count": 27
        },
        {
          "key": "TX",
          "doc_count": 27
        },
        {
          "key": "AL",
          "doc_count": 25
        },
        {
          "key": "MD",
          "doc_count": 25
        },
        {
          "key": "TN",
          "doc_count": 23
        },
        {
          "key": "MA",
          "doc_count": 21
        },
        {
          "key": "NC",
          "doc_count": 21
        },
        {
          "key": "ND",
          "doc_count": 21
        },
        {
          "key": "MO",
          "doc_count": 20
        },
        {
          "key": "AK",
          "doc_count": 19
        }
      ]
    }
  }
}

Java实现:

 1    @Test
 2     public void test2(){
 3         RestClient lowLevelRestClient = RestClient.builder(
 4                 new HttpHost("172.16.73.50", 9200, "http")).build();
 5         RestHighLevelClient client =
 6                 new RestHighLevelClient(lowLevelRestClient);
 7         SearchRequest searchRequest = new SearchRequest("bank");
 8         searchRequest.types("account");
 9         TermsAggregationBuilder aggregation = AggregationBuilders.terms("group_by_state")
10                 .field("state.keyword");
11         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
12         searchSourceBuilder.aggregation(aggregation);
13         searchSourceBuilder.size(0);
14         searchRequest.source(searchSourceBuilder);
15         try {
16             SearchResponse searchResponse = client.search(searchRequest);
17             System.out.println(searchResponse.toString());
18         } catch (IOException e) {
19             e.printStackTrace();
20         }
21         
22     }

Search response

Search response返回对象与其在API里的一样,返回一些元数据和文档数据。
首先,返回对象里的数据十分重要,因为这是查询的返回结果、使用分片情况、文档数据,HTTP状态码等

1 RestStatus status = searchResponse.status();
2 TimeValue took = searchResponse.getTook();
3 Boolean terminatedEarly = searchResponse.isTerminatedEarly();
4 boolean timedOut = searchResponse.isTimedOut();

其次,返回对象里面包含关于分片的信息和分片失败的处理:

1 int totalShards = searchResponse.getTotalShards();
2 int successfulShards = searchResponse.getSuccessfulShards();
3 int failedShards = searchResponse.getFailedShards();
4 for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
5     // failures should be handled here
6 }

取回searchHit

为了取回文档数据,我们要从search response的返回对象里先得到searchHit对象:

1 SearchHits hits = searchResponse.getHits();

取回文档数据:

 1     @Test
 2     public void test2(){
 3         RestClient lowLevelRestClient = RestClient.builder(
 4                 new HttpHost("172.16.73.50", 9200, "http")).build();
 5         RestHighLevelClient client =
 6                 new RestHighLevelClient(lowLevelRestClient);
 7         SearchRequest searchRequest = new SearchRequest("bank");
 8         searchRequest.types("account");
 9         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
10         searchRequest.source(searchSourceBuilder);
11         try {
12             SearchResponse searchResponse = client.search(searchRequest);
13             SearchHits searchHits = searchResponse.getHits();
14             SearchHit[] searchHit = searchHits.getHits();
15             for (SearchHit hit : searchHit) {
16                 System.out.println(hit.getSourceAsString());
17             }
18         } catch (IOException e) {
19             e.printStackTrace();
20         }
21         
22     }

根据需要,还可以转换成其他数据类型:

1 String sourceAsString = hit.getSourceAsString();
2 Map<String, Object> sourceAsMap = hit.getSourceAsMap();
3 String documentTitle = (String) sourceAsMap.get("title");
4 List<Object> users = (List<Object>) sourceAsMap.get("user");
5 Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");

取回聚合数据

聚合数据可以通过SearchResponse返回对象,取到它的根节点,然后再根据名称取到聚合数据。

GET /bank/_search?pretty
{
  "size": 0,
  "aggs": {
    "group_by_state": {
      "terms": {
        "field": "state.keyword"
      }
    }
  }
}

响应:

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 999,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "group_by_state": {
      "doc_count_error_upper_bound": 20,
      "sum_other_doc_count": 770,
      "buckets": [
        {
          "key": "ID",
          "doc_count": 27
        },
        {
          "key": "TX",
          "doc_count": 27
        },
        {
          "key": "AL",
          "doc_count": 25
        },
        {
          "key": "MD",
          "doc_count": 25
        },
        {
          "key": "TN",
          "doc_count": 23
        },
        {
          "key": "MA",
          "doc_count": 21
        },
        {
          "key": "NC",
          "doc_count": 21
        },
        {
          "key": "ND",
          "doc_count": 21
        },
        {
          "key": "MO",
          "doc_count": 20
        },
        {
          "key": "AK",
          "doc_count": 19
        }
      ]
    }
  }
}

Java实现:

 1     @Test
 2     public void test2(){
 3         RestClient lowLevelRestClient = RestClient.builder(
 4                 new HttpHost("172.16.73.50", 9200, "http")).build();
 5         RestHighLevelClient client =
 6                 new RestHighLevelClient(lowLevelRestClient);
 7         SearchRequest searchRequest = new SearchRequest("bank");
 8         searchRequest.types("account");
 9         TermsAggregationBuilder aggregation = AggregationBuilders.terms("group_by_state")
10                 .field("state.keyword");
11         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
12         searchSourceBuilder.aggregation(aggregation);
13         searchSourceBuilder.size(0);
14         searchRequest.source(searchSourceBuilder);
15         try {
16             SearchResponse searchResponse = client.search(searchRequest);
17             Aggregations aggs = searchResponse.getAggregations();
18             Terms byStateAggs = aggs.get("group_by_state");
19             Terms.Bucket b = byStateAggs.getBucketByKey("ID"); //只取key是ID的bucket
20             System.out.println(b.getKeyAsString()+","+b.getDocCount());
21             System.out.println("!!!");
22             List<? extends Bucket> aggList = byStateAggs.getBuckets();//获取bucket数组里所有数据
23             for (Bucket bucket : aggList) {
24                 System.out.println("key:"+bucket.getKeyAsString()+",docCount:"+bucket.getDocCount());;
25             }
26         } catch (IOException e) {
27             e.printStackTrace();
28         }
29     }

 

参考:https://www.jianshu.com/p/5cb91ed22956

参考:https://my.oschina.net/u/3795437/blog/2253366

以上是关于使用Java High Level REST Client操作elasticsearch的主要内容,如果未能解决你的问题,请参考以下文章

Elasticsearch java api操作(Java High Level Rest Client)

elasticsearch 7.7.0 最新版+Java High Level REST Client测试

SpringBoot整合ElasticSearch之Java High Level REST Client

ElasticSearch 使用 High Level REST Client 实现搜索等功能实战

Elasticsearch Java High-Level REST Client 建立一堆 TCP 连接并且在索引数据后不关闭它们

springboot使用rest-high-level-client集成elasticsearch 7.5.1