RestHighLevelClient操作ES的API

Posted Firm陈

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RestHighLevelClient操作ES的API相关的知识,希望对你有一定的参考价值。

Create Index API

RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http")));
 
        CreateIndexRequest request = new CreateIndexRequest("twitter_two");//创建索引
        //创建的每个索引都可以有与之关联的特定设置。
        request.settings(Settings.builder()
                .put("index.number_of_shards", 3)
                .put("index.number_of_replicas", 2)
        );
        //创建索引时创建文档类型映射
        request.mapping("tweet",//类型定义
                "  \\n" +
                        "    \\"tweet\\": \\n" +
                        "      \\"properties\\": \\n" +
                        "        \\"message\\": \\n" +
                        "          \\"type\\": \\"text\\"\\n" +
                        "        \\n" +
                        "      \\n" +
                        "    \\n" +
                        "  ",//类型映射,需要的是一个JSON字符串
                XContentType.JSON);
 
        //为索引设置一个别名
        request.alias(
                new Alias("twitter_alias")
        );
        //可选参数
        request.timeout(TimeValue.timeValueMinutes(2));//超时,等待所有节点被确认(使用TimeValue方式)
        //request.timeout("2m");//超时,等待所有节点被确认(使用字符串方式)
 
        request.masterNodeTimeout(TimeValue.timeValueMinutes(1));//连接master节点的超时时间(使用TimeValue方式)
        //request.masterNodeTimeout("1m");//连接master节点的超时时间(使用字符串方式)
 
        request.waitForActiveShards(2);//在创建索引API返回响应之前等待的活动分片副本的数量,以int形式表示。
        //request.waitForActiveShards(ActiveShardCount.DEFAULT);//在创建索引API返回响应之前等待的活动分片副本的数量,以ActiveShardCount形式表示。
 
        //同步执行
        CreateIndexResponse createIndexResponse = client.indices().create(request);
        //异步执行
        //异步执行创建索引请求需要将CreateIndexRequest实例和ActionListener实例传递给异步方法:
        //CreateIndexResponse的典型监听器如下所示:
        //异步方法不会阻塞并立即返回。
        ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() 
            @Override
            public void onResponse(CreateIndexResponse createIndexResponse) 
                //如果执行成功,则调用onResponse方法;
            
            @Override
            public void onFailure(Exception e) 
                //如果失败,则调用onFailure方法。
            
        ;
        client.indices().createAsync(request, listener);//要执行的CreateIndexRequest和执行完成时要使用的ActionListener
 
        //返回的CreateIndexResponse允许检索有关执行的操作的信息,如下所示:
        boolean acknowledged = createIndexResponse.isAcknowledged();//指示是否所有节点都已确认请求
        boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();//指示是否在超时之前为索引中的每个分片启动了必需的分片副本数

Delete Index API

RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http")));
 
        DeleteIndexRequest request = new DeleteIndexRequest("twitter_two");//指定要删除的索引名称
        //可选参数:
        request.timeout(TimeValue.timeValueMinutes(2)); //设置超时,等待所有节点确认索引删除(使用TimeValue形式)
        // request.timeout("2m"); //设置超时,等待所有节点确认索引删除(使用字符串形式)
 
        request.masterNodeTimeout(TimeValue.timeValueMinutes(1));连接master节点的超时时间(使用TimeValue方式)
        // request.masterNodeTimeout("1m");//连接master节点的超时时间(使用字符串方式)
 
        //设置IndicesOptions控制如何解决不可用的索引以及如何扩展通配符表达式
        request.indicesOptions(IndicesOptions.lenientExpandOpen());
 
        //同步执行
        DeleteIndexResponse deleteIndexResponse = client.indices().delete(request);
 
  /*    //异步执行删除索引请求需要将DeleteIndexRequest实例和ActionListener实例传递给异步方法:
        //DeleteIndexResponse的典型监听器如下所示:
        //异步方法不会阻塞并立即返回。
        ActionListener<DeleteIndexResponse> listener = new ActionListener<DeleteIndexResponse>() 
            @Override
            public void onResponse(DeleteIndexResponse deleteIndexResponse) 
                //如果执行成功,则调用onResponse方法;
            
            @Override
            public void onFailure(Exception e) 
                //如果失败,则调用onFailure方法。
            
        ;
        client.indices().deleteAsync(request, listener);*/
 
        //Delete Index Response
        //返回的DeleteIndexResponse允许检索有关执行的操作的信息,如下所示:
        boolean acknowledged = deleteIndexResponse.isAcknowledged();//是否所有节点都已确认请求
 
 
        //如果找不到索引,则会抛出ElasticsearchException:
        try 
            request = new DeleteIndexRequest("does_not_exist");
            client.indices().delete(request);
         catch (ElasticsearchException exception) 
            if (exception.status() == RestStatus.NOT_FOUND) 
                //如果没有找到要删除的索引,要执行某些操作
            
        

Open Index API

RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http")));
 
        OpenIndexRequest request = new OpenIndexRequest("twitter");//打开索引
        //可选参数:
        request.timeout(TimeValue.timeValueMinutes(2)); //设置超时,等待所有节点确认索引已打开(使用TimeValue形式)
        // request.timeout("2m"); //设置超时,等待所有节点确认索引已打开(使用字符串形式)
 
        request.masterNodeTimeout(TimeValue.timeValueMinutes(1));连接master节点的超时时间(使用TimeValue方式)
        // request.masterNodeTimeout("1m");//连接master节点的超时时间(使用字符串方式)
 
        request.waitForActiveShards(2);//在打开索引API返回响应之前等待的活动分片副本的数量,以int形式表示。
        //request.waitForActiveShards(ActiveShardCount.ONE);//在打开索引API返回响应之前等待的活动分片副本的数量,以ActiveShardCount形式表示。
 
        //设置IndicesOptions控制如何解决不可用的索引以及如何扩展通配符表达式
        request.indicesOptions(IndicesOptions.strictExpandOpen());
 
        //同步执行
        OpenIndexResponse openIndexResponse = client.indices().open(request);
 
        /*//异步执行打开索引请求需要将OpenIndexRequest实例和ActionListener实例传递给异步方法:
        //OpenIndexResponse的典型监听器如下所示:
        //异步方法不会阻塞并立即返回。
        ActionListener<OpenIndexResponse> listener = new ActionListener<OpenIndexResponse>() 
            @Override
            public void onResponse(OpenIndexResponse openIndexResponse) 
                //如果执行成功,则调用onResponse方法;
            
            @Override
            public void onFailure(Exception e) 
                //如果失败,则调用onFailure方法。
            
        ;
        client.indices().openAsync(request, listener);*/
 
        //Open Index Response
        //返回的OpenIndexResponse允许检索有关执行的操作的信息,如下所示:
        boolean acknowledged = openIndexResponse.isAcknowledged();//指示是否所有节点都已确认请求
        boolean shardsAcknowledged = openIndexResponse.isShardsAcknowledged();//指示是否在超时之前为索引中的每个分片启动了必需的分片副本数

Close Index API

RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http")));
        CloseIndexRequest request = new CloseIndexRequest("index");//关闭索引
 
        //可选参数:
        request.timeout(TimeValue.timeValueMinutes(2)); //设置超时,等待所有节点确认索引已关闭(使用TimeValue形式)
        // request.timeout("2m"); //设置超时,等待所有节点确认索引已关闭(使用字符串形式)
 
        request.masterNodeTimeout(TimeValue.timeValueMinutes(1));连接master节点的超时时间(使用TimeValue方式)
        // request.masterNodeTimeout("1m");//连接master节点的超时时间(使用字符串方式)
 
        //设置IndicesOptions控制如何解决不可用的索引以及如何扩展通配符表达式
        request.indicesOptions(IndicesOptions.lenientExpandOpen());
        //同步执行
        CloseIndexResponse closeIndexResponse = client.indices().close(request);
 
         /*//异步执行打开索引请求需要将CloseIndexRequest实例和ActionListener实例传递给异步方法:
        //CloseIndexResponse的典型监听器如下所示:
        //异步方法不会阻塞并立即返回。
        ActionListener<CloseIndexResponse> listener = new ActionListener<CloseIndexResponse>() 
            @Override
            public void onResponse(CloseIndexResponse closeIndexResponse) 
                 //如果执行成功,则调用onResponse方法;
            
            @Override
            public void onFailure(Exception e) 
                 //如果失败,则调用onFailure方法。
            
        ;
        client.indices().closeAsync(request, listener); */
 
        //Close Index Response
        //返回的CloseIndexResponse 允许检索有关执行的操作的信息,如下所示:
        boolean acknowledged = closeIndexResponse.isAcknowledged(); //指示是否所有节点都已确认请求

Single document APIs
Index API

RestHighLevelClient client = new RestHighLevelClient(
               RestClient.builder(
                       new HttpHost("localhost", 9200, "http"),
                       new HttpHost("localhost", 9201, "http")));
       IndexRequest indexRequest1 = new IndexRequest(
               "posts",//索引名称
               "doc",//类型名称
               "1");//文档ID
 
       //==============================提供文档源========================================
       //方式1:以字符串形式提供
       String jsonString = "" +
               "\\"user\\":\\"kimchy\\"," +
               "\\"postDate\\":\\"2013-01-30\\"," +
               "\\"message\\":\\"trying out Elasticsearch\\"" +
               "";
       indexRequest1.source(jsonString, XContentType.JSON);
 
       //方式2:以Map形式提供
       Map<String, Object> jsonMap = new HashMap<>();
       jsonMap.put("user", "kimchy");
       jsonMap.put("postDate", new Date());
       jsonMap.put("message", "trying out Elasticsearch");
       //Map会自动转换为JSON格式的文档源
       IndexRequest indexRequest2 = new IndexRequest("posts", "doc", "1")
               .source(jsonMap);
 
       // 方式3:文档源以XContentBuilder对象的形式提供,Elasticsearch内部会帮我们生成JSON内容
 
       XContentBuilder builder = XContentFactory.jsonBuilder();
       builder.startObject();
       
           builder.field("user", "kimchy");
           builder.field("postDate", new Date());
           builder.field("message", "trying out Elasticsearch");
       
       builder.endObject();
       IndexRequest indexRequest3 = new IndexRequest("posts", "doc", "1")
               .source(builder);
 
       //方式4:以Object key-pairs提供的文档源,它会被转换为JSON格式
       IndexRequest indexRequest4 = new IndexRequest("posts", "doc", "1")
        .source("user", "kimchy",
               "postDate", new Date(),
               "message", "trying out Elasticsearch");
 
       //===============================可选参数start====================================
       indexRequest1.routing("routing");//设置路由值
       indexRequest1.parent("parent");//设置parent值
 
       //设置超时:等待主分片变得可用的时间
       indexRequest1.timeout(TimeValue.timeValueSeconds(1));//TimeValue方式
       indexRequest1.timeout("1s");//字符串方式
 
       //刷新策略
       indexRequest1.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);//WriteRequest.RefreshPolicy实例方式
       indexRequest1.setRefreshPolicy("wait_for");//字符串方式
 
       indexRequest1.version(2);//设置版本
 
       indexRequest1.versionType(VersionType.EXTERNAL);//设置版本类型
 
       //操作类型
       indexRequest1.opType(DocWriteRequest.OpType.CREATE);//DocWriteRequest.OpType方式
       indexRequest1.opType("create");//字符串方式, 可以是 create 或 update (默认)
 
       //The name of the ingest pipeline to be executed before indexing the document
       indexRequest1.setPipeline("pipeline");
       
       //===============================执行====================================
       //同步执行
       IndexResponse indexResponse = client.index(indexRequest1);
 
       //异步执行
       //IndexResponse 的典型监听器如下所示:
       //异步方法不会阻塞并立即返回。
       ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() 
           @Override
           public void onResponse(IndexResponse indexResponse) 
                //执行成功时调用。 Response以参数方式提供
           
 
           @Override
           public void onFailure(Exception e) 
               //在失败的情况下调用。 引发的异常以参数方式提供
           
       ;
       //异步执行索引请求需要将IndexRequest实例和ActionListener实例传递给异步方法:
       client.indexAsync(indexRequest2, listener);
 
       //Index Response
       //返回的IndexResponse允许检索有关执行操作的信息,如下所示:
       String index = indexResponse.getIndex();
       String type = indexResponse.getType();
       String id = indexResponse.getId();
       long version = indexResponse.getVersion();
       if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) 
            //处理(如果需要)第一次创建文档的情况
        else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) 
            //处理(如果需要)文档被重写的情况
       
       ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
       if (shardInfo.getTotal() != shardInfo.getSuccessful()) 
            //处理成功分片数量少于总分片数量的情况
       
       if (shardInfo.getFailed() > 0) 
           for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) 
               String reason = failure.reason();//处理潜在的失败
           
       
 
       //如果存在版本冲突,则会抛出ElasticsearchException:
       IndexRequest request = new IndexRequest("posts", "doc", "1")
               .source("field", "value")
               .version(1);
       try 
           IndexResponse response = client.index(request);
        catch(ElasticsearchException e) 
           if (e.status() == RestStatus.CONFLICT) 
                //引发的异常表示返回了版本冲突错误
           
       
 
       //如果opType设置为创建但是具有相同索引,类型和ID的文档已存在,则也会发生同样的情况:
       request = new IndexRequest("posts", "doc", "1")
               .source("field", "value")
               .opType(DocWriteRequest.OpType.CREATE);
       try 
           IndexResponse response = client.index(request);
        catch(ElasticsearchException e) 
           if (e.status() == RestStatus.CONFLICT) 
                //引发的异常表示返回了版本冲突错误
           
       

Get API

RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http")));
 
        GetRequest getRequest = new GetRequest(
                "posts"以上是关于RestHighLevelClient操作ES的API的主要内容,如果未能解决你的问题,请参考以下文章

Es7.x使用RestHighLevelClient进行查询操作

Es7.x使用RestHighLevelClient进行增删改和批量操作

RestHighLevelClient操作ES的API

Springboot使用RestHighLevelClient7操作es

RestHighLevelClient查询ES

中间件:ES组件RestHighLevelClient用法详解