Spring与RestHighLevelClient

Posted Faylinn

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring与RestHighLevelClient相关的知识,希望对你有一定的参考价值。

  • Elasticsearch连接方式有两种;分别为TCP协议HTTP协议

最近使用es比较多,之前使用一直是使用spring封装的spring-data-elasticsearch;关于spring-data-elasticsearch有以下几点比较难受:

  • 基于TCP协议的使用(不确定是否支持http, 公司XX云大佬推荐使用HTTP协议,好像是官方推荐?)

  • 版本对应比较恶心人

  • 不好用

  • 基于以上几点,索性抛弃spring-data-elasticsearch,自己造轮子;

  • 根据 官方文档 描述,我们选择使用RestHighLevelClient来实现es基础查询;


官方描述:

The Java REST Client comes in 2 flavors:

Java Low Level REST Client: the official low-level client for Elasticsearch. It allows to communicate with an Elasticsearch cluster through http. Leaves requests marshalling and responses un-marshalling to users. It is compatible with all Elasticsearch versions.
Java High Level REST Client: the official high-level client for Elasticsearch. Based on the low-level client, it exposes API specific methods and takes care of requests marshalling and responses un-marshalling.

  • 提供Java Low Level REST Client 版本和 Java High Level REST Client 版本:

    • Java Low Level REST Client 与所有Elasticsearch版本兼容(版本问题舒服)
    • 通过HTTP协议与Elasticsearch集群进行通信(大佬推荐)
    • Java High Level REST Client 是基于Java Low Level REST Client 版本实现更多高级API
  • 很显然我们选择RestHighLevelClient


Spring整合RestHighLevelClient

  1. 构建ElasticsearchClient
  • 查看RestHighLevelClient构造器可以发现可以使用RestClientBuilder来构建,简单demo如下
    /**
     * 连接超时时间
     */
    private final static int CONNECT_TIMEOUT = 5000;
    /**
     * 连接超时时间
     */
    private final static int SOCKET_TIMEOUT = 40000;
    /**
     * 获取连接的超时时间
     */
    private final static int CONNECTION_REQUEST_TIMEOUT = 1000;
    /**
     * 最大连接数
     */
    private final static int MAX_CONNECT_NUM = 100;
    /**
     * 最大路由连接数
     */
    private final static int MAX_CONNECT_ROUTE = 100;

    @Bean(name = "elasticsearchClient", destroyMethod = "close")
    public RestHighLevelClient client() {
        RestClientBuilder builder = RestClient.builder(new HttpHost("host", "port", "http"));
        // 配置一些请求配置的参数
        builder.setRequestConfigCallback(requestConfigBuilder -> {
            requestConfigBuilder.setConnectTimeout(CONNECT_TIMEOUT);
            requestConfigBuilder.setSocketTimeout(SOCKET_TIMEOUT);
            requestConfigBuilder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT);
            return requestConfigBuilder;
        });
        // 配置一些httpClient的参数
        builder.setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.setMaxConnTotal(MAX_CONNECT_NUM);
            httpClientBuilder.setMaxConnPerRoute(MAX_CONNECT_ROUTE);
            return httpClientBuilder;
        });
        builder.setFailureListener(new RestClient.FailureListener(){
            @Override
            public void onFailure(HttpHost host) {
                // TODO do something when failed
                super.onFailure(host);
            }
        });
        return new RestHighLevelClient(builder);
    }
  • 支持一些回调与参数的配置,具体的API可自行查看RestClientBuilder的源码
  • 配置完client后我们可以使用client造一些简单的轮子, 如es默认查询只可以查询1000条数据,我们可以封装查询所有数据
    public List<SearchHit> searchAll(SearchRequest searchRequest) {
        try {
            List<SearchHit> hits = new ArrayList<>(16);
            int maxNum = searchRequest.source().size();
            searchRequest.scroll(TimeValue.timeValueMinutes(10));
            SearchResponse search = client.search(searchRequest);
            hits.addAll(Arrays.asList(search.getHits().getHits()));
            while (search.getHits().getHits().length == maxNum) {
                SearchScrollRequest searchScrollRequest = new SearchScrollRequest(search.getScrollId());
                searchScrollRequest.scroll(TimeValue.timeValueMinutes(10));
                search = client.searchScroll(searchScrollRequest);
                hits.addAll(Arrays.asList(search.getHits().getHits()));
            }
            return hits;
        } catch (IOException e) {
            log.error("Get message error.", e);
            return null;
        }
    }
  • 有了以上接口,我们可以查询一些常用数据,如以下为查询数据的简单使用:
        BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
        boolBuilder.filter(QueryBuilders.termQuery("type", 0));
        boolBuilder.filter(QueryBuilders.termsQuery("id.keyword", id));

        RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("createTime");
        rangeQueryBuilder.gte(startTime);
        rangeQueryBuilder.lte(endTime);
        boolBuilder.filter(rangeQueryBuilder);

        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.size(9999);
        sourceBuilder.fetchSource(new String[]{"field1", "field2", "field3"}, new String[]{});
        sourceBuilder.query(boolBuilder);

        SearchRequest searchRequest = new SearchRequest("index");
        searchRequest.source(sourceBuilder);
        List<SearchHit> searchHits = repository.searchAll(searchRequest);
  • 具体API使用可查看官方文档

更新于2019-10-28

IndexRequest indexRequest = new IndexRequest(index, type, id);
indexRequest.source(entityMapper.mapToString(map), Requests.INDEX_CONTENT_TYPE);
return client.index(indexRequest);
  • 官方API中IndexRequest提供以下几种source方法:

    • 值得注意的是source(Map source)source(Map source, XContentType contentType) 方法,对于Map的传参,会进行类型校验;
    • 源码如下:
     public IndexRequest source(Map source, XContentType contentType) throws ElasticsearchGenerationException {
         try {
             XContentBuilder builder = XContentFactory.contentBuilder(contentType);
             builder.map(source);
             return source(builder);
         } catch (IOException e) {
             throw new ElasticsearchGenerationException("Failed to generate [" + source + "]", e);
         }
     }
    
  • 其中builder.map中的unknownValue方法会遍历参数进行逐一校验:

  • 源码如下:

   private XContentBuilder map(Map<String, ?> values, boolean ensureNoSelfReferences) throws IOException {
       if (values == null) {
           return this.nullValue();
       } else {
           if (ensureNoSelfReferences) {
               ensureNoSelfReferences(values);
           }

           this.startObject();
           Iterator var3 = values.entrySet().iterator();

           while(var3.hasNext()) {
               Entry<String, ?> value = (Entry)var3.next();
               this.field((String)value.getKey());
               this.unknownValue(value.getValue(), false);
           }

           this.endObject();
           return this;
       }
   }
  • 检验方法源码
    private void unknownValue(Object value, boolean ensureNoSelfReferences) throws IOException {
        if (value == null) {
            this.nullValue();
        } else {
            XContentBuilder.Writer writer = (XContentBuilder.Writer)WRITERS.get(value.getClass());
            if (writer != null) {
                writer.write(this, value);
            } else if (value instanceof Path) {
                this.value((Path)value);
            } else if (value instanceof Map) {
                Map<String, ?> valueMap = (Map)value;
                this.map(valueMap, ensureNoSelfReferences);
            } else if (value instanceof Iterable) {
                this.value((Iterable)value, ensureNoSelfReferences);
            } else if (value instanceof Object[]) {
                this.values((Object[])value, ensureNoSelfReferences);
            } else if (value instanceof ToXContent) {
                this.value((ToXContent)value);
            } else {
                if (!(value instanceof Enum)) {
                    throw new IllegalArgumentException("cannot write xcontent for unknown value of type " + value.getClass());
                }

                this.value(Objects.toString(value));
            }

        }
    }
  • 为了避免这个坑,可以使用jsonString来规避,具体使用如下:
	IndexRequest indexRequest = new IndexRequest(index, type, id);
        indexRequest.source(JSON.toJSONString(map), Requests.INDEX_CONTENT_TYPE);
        client.index(indexRequest);

以上是关于Spring与RestHighLevelClient的主要内容,如果未能解决你的问题,请参考以下文章

Mybatis与Spring的整合

spring常用注解作用与常用接口与后置处理器

这几天做spring与mybatis整合,配置文件到是会用了,但是否很明白spring.xml 与spring.mvc.xml区别?

spring cloud 与 spring boot 和 spring cloud alibab 版本号对应

mybatis与spring整合

Spring @Async 与 Spring WebFlux