应用接入ES-Springboot集成ES
Posted PersistentCoder
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了应用接入ES-Springboot集成ES相关的知识,希望对你有一定的参考价值。
Elasticsearch 是一个分布式的开源搜索和分析引擎,适用于所有类型的数据,包括文本、数字、地理空间、结构化和非结构化数据。Elasticsearch 在 Apache Lucene 的基础上开发而成,由 Elasticsearch N.V.(即现在的 Elastic)于 2010 年首次发布。Elasticsearch 以其简单的 REST 风格 API、分布式特性、速度和可扩展性而闻名。一句话来说,Elasticsearch就是专业做搜索的,支持各种复杂的检索。
那么我们有比较复杂的检索诉求的场景下,如果传统关系型数据库无法支持或者说支持成本特别高,那么就可以考虑使用Elasticsearch(以下简称ES)来作为这种场景的替代方案。
一
环境准备
下载安装包:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.8.1.zip
解压:
unzip elasticsearch-6.8.1.zip
修改默认配置:
## 修改jvm.options内存配置
-Xms512m-Xmx512m
## 修改elasticsearch.yaml网络访问限制,允许其他机器访问
network.host: 0.0.0.0
后台启动:
./bin/elasticsearch -d
检查启动情况:
curl http://x.x.x.x:9200
启动正常。
创建索引配置:
curl -H 'Content-Type: application/json' -XPUT http://localhost:9200/reverse -d'{
"settings" : {
"index" : {
"number_of_shards" : 3,
"number_of_replicas" : 2
}
}
}'
这里制定了3个分片和两个副本,可以不配置使用默认。
创建映射:
curl -H 'Content-Type: application/json' -XPUT 'http://localhost:9200/reverse/_mapping/return_order' -d '{
"properties" : {
"a" : {
"type" : "date"
}
...
}
}'
可以使用http://xxx.xxx.xxx.xxx:9200/reverse/_mapping/return_order查看配置的type映射内容。
二
应用集成ES
本篇我们只讲述应用集成ES完成基本crud的几种方式,对于业务数据同步到ES暂不做讲述。应用集成ES的方式大致有4种,在说具体实现方式之前引一个小插曲,看一下ES java api的一段话:
We plan on deprecating the TransportClient in Elasticsearch 7.0 and removing it
completely in 8.0. Instead, you should be using the Java High Level REST Client,
which executes HTTP requests rather than serialized Java requests. The migrationguidedescribes all the steps needed to migrate. The Java High Level REST Client currently has support for the more commonly usedAPIs, but there are a lot more that still need to be added. You can help us prioritiseby telling us which missing APIs you need for your application by adding a commentto this issue: Java high-level REST client completeness.
大致意思是ES7.0会废弃掉TransportClient(Netty基于tcp协议实现的客户端),8.0会完全移除,官方建议使用RestHighLevelClient基于http协议访问ES。
所以基于这个原因,应用集成ES按照通信协议维度,分别有以下实现方式:
TCP:
Jpa
ElasticSearchTemplate
Http
RestClient
RestHighLevelClient
Jpa是最传统的持久层框架此处不赘述,ES的Jpa由spring-data-elasticsearch实现,提供一些模板化的更新操作和查询,最大的缺点是不够灵活,对定制化操作不友好并且自定义实现有点繁杂。
ElasticSearchTemplate是类似于JdbcTemplate,也是spring-data-elasticsearch提供的对各种ES更新和查询操作的模板实现,相比ElasticsearchRepositoryapi更清晰易懂,除了支持通用的实现之外也能支持各种定制化的检索操作。
RestClient是低版本的Rest客户端,基于http协议实现,对于ES中各种操作也能够很好的支持,但是api相对复杂,无法提供更“java化”的交互,简单点说对于客户端与服务端的请求和响应棱角过明显,对于响应结果更偏向于程序自己解析和序列化。
RestHighLevelClient是基于低版本客户端RestClient实现的高亮Rest客户端,目前官方极力推荐使用RestHighLevelClient来访问ES,如果发现高版本RestClient缺失功能,则会降级到低版本RestClient来发送json请求。
接下来我们切入正题,Jpa和低版本RestClient这里不展开讲述,重点来看一下ElasticSearchTemplate和RestHighLevelClient这两种方式的实现。
增加ES相关配置,application.yml:
spring:
data:
elasticsearch:
cluster-name: elasticsearch
cluster-nodes: host:9300 #tcp是9300
local: false
repositories:
enable: false
引入依赖:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>transport-netty4-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-elasticsearch</artifactId>
</dependency>
访问ES实现:
## 自动注入ElasticSearchTemplate @Autowired private ElasticsearchTemplate elasticsearchTemplate; ## 查单个 public ReturnEsDO getByReturnId(String returnId) { SearchQuery query = new NativeSearchQueryBuilder().withQuery(new QueryStringQueryBuilder(returnId).field("returnId")).build(); List<ReturnEsDO> list = this.elasticsearchTemplate.queryForList(query,ReturnEsDO.class); if(null == list || list.isEmpty()) { log.warn("returnId={},result empty",returnId); return null; } return list.get(0); } ## 分页查询 public Page<ReturnEsDO> queryPage(ReturnEsQueryParam param) { String sortStr = param.getSort(); Sort sort = new Sort(SortEnum.DESC.getType().equalsIgnoreCase(sortStr) ? Sort.Direction.DESC : Sort.Direction.ASC,"appliedTime"); Pageable pageable = PageRequest.of(param.getPageNum() - 1,param.getPageSize(),sort); SearchQuery query = new NativeSearchQueryBuilder() .withIndices(index) .withTypes(type) .withQuery(this.buildQuery(param)) .withPageable(pageable) .build(); Page<ReturnEsDO> page = this.elasticsearchTemplate.queryForPage(query,ReturnEsDO.class); return page; } ## 新增doc public void save(ReturnEsDO returnEsDO) { IndexQuery indexQuery = this.buildIndexQuery(returnEsDO); String index = this.elasticsearchTemplate.index(indexQuery); log.info("save success;param={},index={}",returnEsDO,index); } ## 批量新增 public void batchSave(List<ReturnEsDO> esDOList) { List<IndexQuery> indexQueryList = esDOList.stream() .map(this::buildIndexQuery) .collect(Collectors.toList()); this.elasticsearchTemplate.bulkIndex(indexQueryList); } ## 删除文档 public void delete(String returnId) { DeleteQuery deleteQuery = new DeleteQuery(); deleteQuery.setQuery(QueryBuilders.matchQuery("returnId", returnId)); deleteQuery.setIndex(index); deleteQuery.setType(type); this.elasticsearchTemplate.delete(deleteQuery); log.info("delete success;returnId={}",returnId); } ## 更新文档 public void update(ReturnEsDO returnEsDO) { try { UpdateRequest updateRequest = new UpdateRequest() .index(index) .type(type) .id(returnEsDO.getReturnId()) .doc(this.buildUpdateDoc(returnEsDO)); UpdateQuery updateQuery = new UpdateQueryBuilder() .withIndexName(index) .withType(type) .withId(returnEsDO.getReturnId()) .withClass(ReturnEsDO.class) .withUpdateRequest(updateRequest) .build(); UpdateResponse response = this.elasticsearchTemplate.update(updateQuery); log.info("update end;param={},response={}",returnEsDO,response); } catch (Exception e) { log.error("update occur error;param={}",returnEsDO,e); } }
具体的构建查询和更新参数不展开描述。
该客户端是是目前官方推荐使用的RestClient。
增加ES相关配置:
es: host: address: host port: 9200 protocol:http
引入maven依赖:
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> </dependency>
增加RestHighLevelClient配置:
@Configuration public class ElasticsearchConfig { @Value("${es.host.address}") private String esHostAddress; @Value("${es.host.port}") private Integer esHostPort; @Value("${es.hot.protocol}") private String protocol; @Bean(destroyMethod = "close") public RestHighLevelClient restHighLevelClient() { RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost(esHostAddress, esHostPort, protocol))); return client; } }
访问ES实现:
## 注入Client @Autowired private RestHighLevelClient restHighLevelClient; ## 查询单条记录 public ReturnEsDO getByReturnId(String returnId) { try { GetRequest getRequest = new GetRequest(index, type, returnId); GetResponse response = this.restHighLevelClient.get(getRequest, RequestOptions.DEFAULT); Map<String, Object> map = response.getSourceAsMap(); return this.transferMap2DO(map); } catch (Exception e) { log.error("ReturnRepository.getByReturnId occur error;returnId={}", returnId, e); return null; } } ## 分页查询 public PageResp<ReturnEsDO> queryPage(ReturnEsQueryParam param) { try { SearchRequest request = new SearchRequest(index); QueryBuilder queryBuilder = this.buildQuery(param); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(queryBuilder); sourceBuilder.size(param.getPageSize()); sourceBuilder.from((param.getPageNum()-1)*param.getPageSize()); String sortStr = param.getSort(); sourceBuilder.sort("appliedTime", SortEnum.DESC.getType().equalsIgnoreCase(sortStr) ? SortOrder.DESC : SortOrder.ASC); sourceBuilder.trackTotalHits(true); request.source(sourceBuilder); SearchResponse response = this.restHighLevelClient.search(request, RequestOptions.DEFAULT); if (null == response || null == response.getHits()) { log.warn("ReturnEsRepository.queryPage response empty;param={}", param); return PageResp.wrap(0, 0, Collections.emptyList()); } SearchHits result = response.getHits(); int totalHits = (int) result.getTotalHits(); if (totalHits <= 0) { log.warn("ReturnEsRepository.queryPage response find not hits;param={}", param); return PageResp.wrap(0, 0, Collections.emptyList()); } List<ReturnEsDO> list = new ArrayList<>(); for (SearchHit hit : result.getHits()) { ReturnEsDO esDO = JSON.parseObject(hit.getSourceAsString(), ReturnEsDO.class); list.add(esDO); } return PageResp.wrap(totalHits, this.calculateTotalPage(totalHits, param.getPageSize()), list); } catch (Exception e) { log.error("ReturnEsRepository.queryPage occur error;param={}", param, e); return PageResp.wrap(0, 0, Collections.emptyList()); } } ## 新增文档 public void save(ReturnEsDO returnEsDO) { try { IndexRequest indexRequest = new IndexRequest(index, type, returnEsDO.getReturnId()).source(JSON.toJSONString(returnEsDO), XContentType.JSON); IndexResponse response = this.restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); log.info("ReturnEsRepository.save success;param={},response={}", returnEsDO, response); } catch (Exception e) { log.error("ReturnEsRepository.save occur error;returnEsDO={}", JSON.toJSONString(returnEsDO), e); } } ## 批量新增文档 public void batchSave(List<ReturnEsDO> esDOList) { try { BulkRequest request = new BulkRequest(); for (ReturnEsDO esDO : esDOList) { IndexRequest indexRequest = new IndexRequest(index, type, esDO.getReturnId()).source(JSON.toJSONString(esDO), XContentType.JSON); request.add(indexRequest); } BulkResponse response = this.restHighLevelClient.bulk(request, RequestOptions.DEFAULT); log.info("ReturnEsRepository.batchSave success;response={}", response); } catch (Exception e) { log.error("ReturnEsRepository.batchSave occur error;param={}", JSON.toJSONString(esDOList), e); } } ## 删除文档 public void delete(String returnId) { try { DeleteRequest request = new DeleteRequest(index, type, returnId); DeleteResponse response = this.restHighLevelClient.delete(request, RequestOptions.DEFAULT); log.info("ReturnEsRepository.delete success;returnId={},response={}", returnId, response); } catch (Exception e) { log.error("ReturnEsRepository.delete occur error;returnId={}", returnId, e); } } ## 更新文档 public void update(ReturnEsDO returnEsDO) { try { UpdateRequest updateRequest = new UpdateRequest() .index(index) .type(type) .id(returnEsDO.getReturnId()) .doc(this.buildUpdateDoc(returnEsDO)); UpdateResponse response = this.restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT); log.info("ReturnEsRepository.update end;param={},response={}", returnEsDO, response); } catch (Exception e) { log.error("ReturnEsRepository.update occur error;param={}", returnEsDO, e); } }
三
容易踩到的坑
1.ElasticSearchTemplate方式端口问题
ElasticSearchTemplate通过tcp协议访问ES,端口默认是9300,容易写成9200(http访问端口)。
2.ElasticSearchTemplate分页查询问题
ElasticSearchTemplate#queryForPage分页查询从第0页开始,如果和mysql分页查询一样从第1页开始容易查不到数据。
3.Amazon Elasticsearch Service不支持tcp协议
如果公司用的是AWS提供的ES服务,那么在开发之前一定要确认好,因为Amazon的ES服务压根不支持tcp协议访问,所以Jpa和ElasticSearchTemplate这两种实现完全失效,如果一不小心在开发和测试环境都是用这两种方式编码和测试,那么很不幸,代码重写切换成RestClient或者高亮RestClient。
4.RestHighLevelClient端口问题
虽然RestHighLevelClient通过http方式访问ES,但是有的ES服务可能是http也可能是https,在配置host和端口的时候一定要注意,http默认端口是80,https默认端口是443,否则会出现服务连不上或者节点找不到等一系列问题。
5.RestHighLevelClient分页问题
RestHighLevelClient分页查询通过from和size属性指定,需要注意的是这里的from不是mysql查询中的pageNumber,而是类似offset起始位点,如果设置成查询页码,会出现查询数据错乱的问题。
以上是关于应用接入ES-Springboot集成ES的主要内容,如果未能解决你的问题,请参考以下文章