ElasticSearch 7.3 结合Spring boot进行增删改查和批量(bulk)详解

Posted |旧市拾荒|

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticSearch 7.3 结合Spring boot进行增删改查和批量(bulk)详解相关的知识,希望对你有一定的参考价值。

1、前置

java api 文档 https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.3/java-rest-overview.html。low : 偏向底层。high:高级封装。

导入相关maven依赖

    <!--es客户端-->
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.3.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>7.3.0</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>2.0.6.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <version>2.0.6.RELEASE</version>
    </dependency>

2、配置application.yml

spring:
  application:
    name: search-service
config:
  elasticsearch:
    hostlist: 127.0.0.1:9200 #多个节点用逗号分隔

3、配置类

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ElasticsearchConfig 
   @Value(
"$config.elasticsearch.hostlist") private String hostlist; @Bean(destroyMethod = "close") public RestHighLevelClient restHighLevelClient() String[] split = hostlist.split(","); HttpHost[] httpHostsArray = new HttpHost[split.length]; for (int i = 0; i < split.length; i++) String item=split[i]; httpHostsArray[i]=new HttpHost(item.split(":")[0],Integer.parseInt(item.split(":")[1]),"http"); return new RestHighLevelClient(RestClient.builder(httpHostsArray));

 4、查询测试方法

@SpringBootTest
@RunWith(SpringRunner.class)
//查询文档
       @Test
    public void testGet() throws IOException 
        //构建请求
        GetRequest getRequest = new GetRequest("test_post", "1");

        //========================可选参数 start======================
        //为特定字段配置_source_include
//        String[] includes = new String[]"user", "message";
//        String[] excludes = Strings.EMPTY_ARRAY;
//        FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
//        getRequest.fetchSourceContext(fetchSourceContext);

        //为特定字段配置_source_excludes
//        String[] includes1 = new String[]"user", "message";
//        String[] excludes1 = Strings.EMPTY_ARRAY;
//        FetchSourceContext fetchSourceContext1 = new FetchSourceContext(true, includes1, excludes1);
//        getRequest.fetchSourceContext(fetchSourceContext1);

        //设置路由
//        getRequest.routing("routing");

        // ========================可选参数 end=====================

        //查询 同步查询
      GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);

        //异步查询
//        ActionListener<GetResponse> listener = new ActionListener<GetResponse>() 
//            //查询成功时的立马执行的方法
//            @Override
//            public void onResponse(GetResponse getResponse) 
//                long version = getResponse.getVersion();
//                String sourceAsString = getResponse.getSourceAsString();//检索文档(String形式)
//                System.out.println(sourceAsString);
//            
//
//            //查询失败时的立马执行的方法
//            @Override
//            public void onFailure(Exception e) 
//                e.printStackTrace();
//            
//        ;
//        //执行异步请求
//        client.getAsync(getRequest, RequestOptions.DEFAULT, listener);
//        try 
//            Thread.sleep(5000);
//         catch (InterruptedException e) 
//            e.printStackTrace();
//        

        // 获取结果
        if (getResponse.isExists()) 
            long version = getResponse.getVersion();

            String sourceAsString = getResponse.getSourceAsString();//检索文档(String形式)
            System.out.println(sourceAsString);
            byte[] sourceAsBytes = getResponse.getSourceAsBytes();//以字节形式返回
            Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
            System.out.println(sourceAsMap);
        
    

5、新增测试方法

@Test
    public void testAdd() throws IOException 
//        1构建请求
        IndexRequest request=new IndexRequest("test_posts");
        request.id("3");
//        =======================构建文档============================
//        构建方法1
        String jsonString="\\n" +
                "  \\"user\\":\\"tomas J\\",\\n" +
                "  \\"postDate\\":\\"2019-07-18\\",\\n" +
                "  \\"message\\":\\"trying out es3\\"\\n" +
                "";
        request.source(jsonString, XContentType.JSON);

//        构建方法2
//        Map<String,Object> jsonMap=new HashMap<>();
//        jsonMap.put("user", "tomas");
//        jsonMap.put("postDate", "2019-07-18");
//        jsonMap.put("message", "trying out es2");
//        request.source(jsonMap);

//        构建方法3
//        XContentBuilder builder= XContentFactory.jsonBuilder();
//        builder.startObject();
//        
//            builder.field("user", "tomas");
//            builder.timeField("postDate", new Date());
//            builder.field("message", "trying out es2");
//        
//        builder.endObject();
//        request.source(builder);
//        构建方法4
//        request.source("user","tomas",
//                    "postDate",new Date(),
//                "message","trying out es2");
//
//        ========================可选参数===================================
        //设置超时时间
        request.timeout(TimeValue.timeValueSeconds(1));
        request.timeout("1s");

        //自己维护版本号
//        request.version(2);
//        request.versionType(VersionType.EXTERNAL);

//        2执行
        //同步
        IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
        //异步
//        ActionListener<IndexResponse> listener=new ActionListener<IndexResponse>() 
//            @Override
//            public void onResponse(IndexResponse indexResponse) 
//
//            
//
//            @Override
//            public void onFailure(Exception e) 
//
//            
//        ;
//        client.indexAsync(request,RequestOptions.DEFAULT, listener ); 
//        try 
//            Thread.sleep(5000);
//         catch (InterruptedException e) 
//            e.printStackTrace();
//        


//        3获取结果
        String index = indexResponse.getIndex();
        String id = indexResponse.getId();
        //获取插入的类型
        if(indexResponse.getResult()== DocWriteResponse.Result.CREATED)
            DocWriteResponse.Result result=indexResponse.getResult();
            System.out.println("CREATED:"+result);
        else if(indexResponse.getResult()== DocWriteResponse.Result.UPDATED)
            DocWriteResponse.Result result=indexResponse.getResult();
            System.out.println("UPDATED:"+result);
        

        ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
        if(shardInfo.getTotal()!=shardInfo.getSuccessful())
            System.out.println("处理成功的分片数少于总分片!");
        
        if(shardInfo.getFailed()>0)
           for (ReplicationResponse.ShardInfo.Failure failure:shardInfo.getFailures()) 
               String reason = failure.reason();//处理潜在的失败原因
               System.out.println(reason);
           
        
    

6、修改测试方法

  @Test
    public void testUpdate() throws IOException 
//        1构建请求
        UpdateRequest request = new UpdateRequest("test_posts", "3");
        Map<String, Object> jsonMap = new HashMap<>();
        jsonMap.put("user", "tomas JJ");
        request.doc(jsonMap);
//===============================可选参数==========================================
        request.timeout("1s");//超时时间

        //重试次数
        request.retryOnConflict(3);

        //设置在继续更新之前,必须激活的分片数
//        request.waitForActiveShards(2);
        //所有分片都是active状态,才更新
//        request.waitForActiveShards(ActiveShardCount.ALL);

//        2执行
//        同步
        UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
//        异步

//        3获取数据
        updateResponse.getId();
        updateResponse.getIndex();

        //判断结果
        if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) 
            DocWriteResponse.Result result = updateResponse.getResult();
            System.out.println("CREATED:" + result);
         else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) 
            DocWriteResponse.Result result = updateResponse.getResult();
            System.out.println("UPDATED:" + result);
        else if(updateResponse.getResult() == DocWriteResponse.Result.DELETED)
            DocWriteResponse.Result result = updateResponse.getResult();
            System.out.println("DELETED:" + result);
        else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP)
            //没有操作
            DocWriteResponse.Result result = updateResponse.getResult();
            System.out.println("NOOP:" + result);
        
    

7、删除测试方法

 @Test
    public void testDelete() throws IOException 
//        1构建请求
        DeleteRequest request =new DeleteRequest("test_posts","3");
        //可选参数

//        2执行
        DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);

//        3获取数据
        deleteResponse.getId();
        deleteResponse.getIndex();

        DocWriteResponse.Result result = deleteResponse.getResult();
        System.out.println(result);

8、批量(bulk)测试方法

@Test
    public void testBulk() throws IOException 
//        1创建请求
        BulkRequest request = new BulkRequest();
//        request.add(new IndexRequest("post").id("1").source(XContentType.JSON, "field", "1"));
//        request.add(new IndexRequest("post").id("2").source(XContentType.JSON, "field", "2"));

        request.add(new UpdateRequest("post","2").doc(XContentType.JSON, "field", "3"));
        request.add(new DeleteRequest("post").id("1"));

//        2执行
        BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

        for (BulkItemResponse itemResponse : bulkResponse) 
            DocWriteResponse itemResponseResponse = itemResponse.getResponse();

            switch (itemResponse.getOpType()) 
                case INDEX:
                case CREATE:
                    IndexResponse indexResponse = (IndexResponse) itemResponseResponse;
                    indexResponse.getId();
                    System.out.println(indexResponse.getResult());
                    break;
                case UPDATE:
                    UpdateResponse updateResponse = (UpdateResponse) itemResponseResponse;
                    updateResponse.getIndex();
                    System.out.println(updateResponse.getResult());
                    break;
                case DELETE:
                    DeleteResponse deleteResponse = (DeleteResponse) itemResponseResponse;
                    System.out.println(deleteResponse.getResult());
                    break;
            
        
    

 

以上是关于ElasticSearch 7.3 结合Spring boot进行增删改查和批量(bulk)详解的主要内容,如果未能解决你的问题,请参考以下文章

ElasticSearch 7.3采用restful风格 基本的增删查改语句

Elasticsearch1.7.3升级到2.4.2记录

Centos 7.3 简便搭建EFK日志分析

Elasticsearch 学习+SpringBoot实战教程

Elasticsearch学习 spring boot整合Elasticsearch的原生方式

springboot 使用 elasticsearch(使用)