ElasticSearch 7.3 结合Spring boot进行增删改查和批量(bulk)详解
Posted |旧市拾荒|
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticSearch 7.3 结合Spring boot进行增删改查和批量(bulk)详解相关的知识,希望对你有一定的参考价值。
1、前置
java api 文档 https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.3/java-rest-overview.html。low : 偏向底层。high:高级封装。
导入相关maven依赖
<!--es客户端--> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.3.0</version> <exclusions> <exclusion> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.3.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>2.0.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <version>2.0.6.RELEASE</version> </dependency>
2、配置application.yml
spring: application: name: search-service config: elasticsearch: hostlist: 127.0.0.1:9200 #多个节点用逗号分隔
3、配置类
import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ElasticsearchConfig
@Value("$config.elasticsearch.hostlist") private String hostlist; @Bean(destroyMethod = "close") public RestHighLevelClient restHighLevelClient() String[] split = hostlist.split(","); HttpHost[] httpHostsArray = new HttpHost[split.length]; for (int i = 0; i < split.length; i++) String item=split[i]; httpHostsArray[i]=new HttpHost(item.split(":")[0],Integer.parseInt(item.split(":")[1]),"http"); return new RestHighLevelClient(RestClient.builder(httpHostsArray));
4、查询测试方法
@SpringBootTest @RunWith(SpringRunner.class) //查询文档 @Test public void testGet() throws IOException //构建请求 GetRequest getRequest = new GetRequest("test_post", "1"); //========================可选参数 start====================== //为特定字段配置_source_include // String[] includes = new String[]"user", "message"; // String[] excludes = Strings.EMPTY_ARRAY; // FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); // getRequest.fetchSourceContext(fetchSourceContext); //为特定字段配置_source_excludes // String[] includes1 = new String[]"user", "message"; // String[] excludes1 = Strings.EMPTY_ARRAY; // FetchSourceContext fetchSourceContext1 = new FetchSourceContext(true, includes1, excludes1); // getRequest.fetchSourceContext(fetchSourceContext1); //设置路由 // getRequest.routing("routing"); // ========================可选参数 end===================== //查询 同步查询 GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT); //异步查询 // ActionListener<GetResponse> listener = new ActionListener<GetResponse>() // //查询成功时的立马执行的方法 // @Override // public void onResponse(GetResponse getResponse) // long version = getResponse.getVersion(); // String sourceAsString = getResponse.getSourceAsString();//检索文档(String形式) // System.out.println(sourceAsString); // // // //查询失败时的立马执行的方法 // @Override // public void onFailure(Exception e) // e.printStackTrace(); // // ; // //执行异步请求 // client.getAsync(getRequest, RequestOptions.DEFAULT, listener); // try // Thread.sleep(5000); // catch (InterruptedException e) // e.printStackTrace(); // // 获取结果 if (getResponse.isExists()) long version = getResponse.getVersion(); String sourceAsString = getResponse.getSourceAsString();//检索文档(String形式) System.out.println(sourceAsString); byte[] sourceAsBytes = getResponse.getSourceAsBytes();//以字节形式返回 Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); System.out.println(sourceAsMap);
5、新增测试方法
@Test public void testAdd() throws IOException // 1构建请求 IndexRequest request=new IndexRequest("test_posts"); request.id("3"); // =======================构建文档============================ // 构建方法1 String jsonString="\\n" + " \\"user\\":\\"tomas J\\",\\n" + " \\"postDate\\":\\"2019-07-18\\",\\n" + " \\"message\\":\\"trying out es3\\"\\n" + ""; request.source(jsonString, XContentType.JSON); // 构建方法2 // Map<String,Object> jsonMap=new HashMap<>(); // jsonMap.put("user", "tomas"); // jsonMap.put("postDate", "2019-07-18"); // jsonMap.put("message", "trying out es2"); // request.source(jsonMap); // 构建方法3 // XContentBuilder builder= XContentFactory.jsonBuilder(); // builder.startObject(); // // builder.field("user", "tomas"); // builder.timeField("postDate", new Date()); // builder.field("message", "trying out es2"); // // builder.endObject(); // request.source(builder); // 构建方法4 // request.source("user","tomas", // "postDate",new Date(), // "message","trying out es2"); // // ========================可选参数=================================== //设置超时时间 request.timeout(TimeValue.timeValueSeconds(1)); request.timeout("1s"); //自己维护版本号 // request.version(2); // request.versionType(VersionType.EXTERNAL); // 2执行 //同步 IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT); //异步 // ActionListener<IndexResponse> listener=new ActionListener<IndexResponse>() // @Override // public void onResponse(IndexResponse indexResponse) // // // // @Override // public void onFailure(Exception e) // // // ; // client.indexAsync(request,RequestOptions.DEFAULT, listener ); // try // Thread.sleep(5000); // catch (InterruptedException e) // e.printStackTrace(); // // 3获取结果 String index = indexResponse.getIndex(); String id = indexResponse.getId(); //获取插入的类型 if(indexResponse.getResult()== DocWriteResponse.Result.CREATED) DocWriteResponse.Result result=indexResponse.getResult(); System.out.println("CREATED:"+result); else if(indexResponse.getResult()== DocWriteResponse.Result.UPDATED) DocWriteResponse.Result result=indexResponse.getResult(); System.out.println("UPDATED:"+result); ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if(shardInfo.getTotal()!=shardInfo.getSuccessful()) System.out.println("处理成功的分片数少于总分片!"); if(shardInfo.getFailed()>0) for (ReplicationResponse.ShardInfo.Failure failure:shardInfo.getFailures()) String reason = failure.reason();//处理潜在的失败原因 System.out.println(reason);
6、修改测试方法
@Test public void testUpdate() throws IOException // 1构建请求 UpdateRequest request = new UpdateRequest("test_posts", "3"); Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("user", "tomas JJ"); request.doc(jsonMap); //===============================可选参数========================================== request.timeout("1s");//超时时间 //重试次数 request.retryOnConflict(3); //设置在继续更新之前,必须激活的分片数 // request.waitForActiveShards(2); //所有分片都是active状态,才更新 // request.waitForActiveShards(ActiveShardCount.ALL); // 2执行 // 同步 UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT); // 异步 // 3获取数据 updateResponse.getId(); updateResponse.getIndex(); //判断结果 if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) DocWriteResponse.Result result = updateResponse.getResult(); System.out.println("CREATED:" + result); else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) DocWriteResponse.Result result = updateResponse.getResult(); System.out.println("UPDATED:" + result); else if(updateResponse.getResult() == DocWriteResponse.Result.DELETED) DocWriteResponse.Result result = updateResponse.getResult(); System.out.println("DELETED:" + result); else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) //没有操作 DocWriteResponse.Result result = updateResponse.getResult(); System.out.println("NOOP:" + result);
7、删除测试方法
@Test public void testDelete() throws IOException // 1构建请求 DeleteRequest request =new DeleteRequest("test_posts","3"); //可选参数 // 2执行 DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT); // 3获取数据 deleteResponse.getId(); deleteResponse.getIndex(); DocWriteResponse.Result result = deleteResponse.getResult(); System.out.println(result);
8、批量(bulk)测试方法
@Test public void testBulk() throws IOException // 1创建请求 BulkRequest request = new BulkRequest(); // request.add(new IndexRequest("post").id("1").source(XContentType.JSON, "field", "1")); // request.add(new IndexRequest("post").id("2").source(XContentType.JSON, "field", "2")); request.add(new UpdateRequest("post","2").doc(XContentType.JSON, "field", "3")); request.add(new DeleteRequest("post").id("1")); // 2执行 BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); for (BulkItemResponse itemResponse : bulkResponse) DocWriteResponse itemResponseResponse = itemResponse.getResponse(); switch (itemResponse.getOpType()) case INDEX: case CREATE: IndexResponse indexResponse = (IndexResponse) itemResponseResponse; indexResponse.getId(); System.out.println(indexResponse.getResult()); break; case UPDATE: UpdateResponse updateResponse = (UpdateResponse) itemResponseResponse; updateResponse.getIndex(); System.out.println(updateResponse.getResult()); break; case DELETE: DeleteResponse deleteResponse = (DeleteResponse) itemResponseResponse; System.out.println(deleteResponse.getResult()); break;
本文来自博客园,作者:|旧市拾荒|,转载请注明原文链接:https://www.cnblogs.com/xiaoyh/p/15982964.html
以上是关于ElasticSearch 7.3 结合Spring boot进行增删改查和批量(bulk)详解的主要内容,如果未能解决你的问题,请参考以下文章
ElasticSearch 7.3采用restful风格 基本的增删查改语句
Elasticsearch 学习+SpringBoot实战教程