ESJava High Level REST Client官方索引和文档操作指导

Posted KeepGoing

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ESJava High Level REST Client官方索引和文档操作指导相关的知识,希望对你有一定的参考价值。

索引操作和文档基本操作


import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;

import com.alibaba.fastjson.JSON;

/**
 *  es7.6.x 高级客户端测试 API
 */
@SpringBootTest
public class ElasticsearchJdApplicationTests {
    // 面向对象来操作
    @Autowired
    @Qualifier("restHighLevelClient")
    private RestHighLevelClient client;

    // 测试索引的创建 Request PUT kuang_index
    @Test
    void testCreateIndex() throws IOException {
        // 1、创建索引请求
        CreateIndexRequest request = new CreateIndexRequest("kuang_index");
        // 2、客户端执行请求 IndicesClient,请求后获得响应
        CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
        System.out.println(createIndexResponse);
    }

    // 测试获取索引,判断其是否存在
    @Test
    void testExistIndex() throws IOException {
        GetIndexRequest request = new GetIndexRequest("kuang_index2");
        boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
        System.out.println(exists);
    }

    // 测试删除索引
    @Test
    void testDeleteIndex() throws IOException {
        DeleteIndexRequest request = new DeleteIndexRequest("kuang_index");
        // 删除
        AcknowledgedResponse delete = client.indices().delete(request, RequestOptions.DEFAULT);
        System.out.println(delete.isAcknowledged());
    }

    // 测试添加文档
    @Test
    void testAddDocument() throws IOException {
        // 创建对象
        User user = new User("狂神说", 3);
        // 创建请求
        IndexRequest request = new IndexRequest("kuang_index");
        // 规则 put /kuang_index/_doc/1
        request.id("1");
        request.timeout(TimeValue.timeValueSeconds(1));
        request.timeout("1s");
        // 将我们的数据放入请求 json
        request.source(JSON.toJSONString(user), XContentType.JSON);
        // 客户端发送请求 , 获取响应的结果
        IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
        System.out.println(indexResponse.toString()); //
        System.out.println(indexResponse.status()); // 对应我们命令返回的状态CREATED
    }

    // 获取文档,判断是否存在 get /index/doc/1
    @Test
    void testIsExists() throws IOException {
        GetRequest getRequest = new GetRequest("kuang_index", "1");
        // 不获取返回的 _source 的上下文了
        getRequest.fetchSourceContext(new FetchSourceContext(false));
        getRequest.storedFields("_none_");
        boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
        System.out.println(exists);
    }

    // 获得文档的信息
    @Test
    void testGetDocument() throws IOException {
        GetRequest getRequest = new GetRequest("kuang_index", "1");
        GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
        System.out.println(getResponse.getSourceAsString()); // 打印文档的内容
        System.out.println(getResponse); // 返回的全部内容和命令式一样的
    }

    // 更新文档的信息
    @Test
    void testUpdateRequest() throws IOException {
        UpdateRequest updateRequest = new UpdateRequest("kuang_index", "1");
        updateRequest.timeout("1s");
        User user = new User("狂神说Java", 18);
        updateRequest.doc(JSON.toJSONString(user), XContentType.JSON);
        UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
        System.out.println(updateResponse.status());
    }

    // 删除文档记录
    @Test
    void testDeleteRequest() throws IOException {
        DeleteRequest request = new DeleteRequest("kuang_index", "1");
        request.timeout("1s");
        DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
        System.out.println(deleteResponse.status());
    }

    // 特殊的,真的项目一般都会批量插入数据!
    @Test
    void testBulkRequest() throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.timeout("10s");
        ArrayList<User> userList = new ArrayList<>();
        userList.add(new User("kuangshen1", 3));
        userList.add(new User("kuangshen2", 3));
        userList.add(new User("kuangshen3", 3));
        userList.add(new User("qinjiang1", 3));
        userList.add(new User("qinjiang1", 3));
        userList.add(new User("qinjiang1", 3));
        // 批处理请求
        for (int i = 0; i < userList.size(); i++) {
            // 批量更新和批量删除,就在这里修改对应的请求就可以了
            bulkRequest.add(new IndexRequest("kuang_index").id("" + (i + 1))
                .source(JSON.toJSONString(userList.get(i)), XContentType.JSON));
        }
        BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        System.out.println(bulkResponse.hasFailures()); // 是否失败,返回 false 代表 成功!
    }

    // 查询
    // SearchRequest 搜索请求
    // SearchSourceBuilder 条件构造
    // HighlightBuilder 构建高亮
    // TermQueryBuilder 精确查询
    // MatchAllQueryBuilder
    // xxx QueryBuilder 对应我们刚才看到的命令!
    @Test
    void testSearch() throws IOException {
        SearchRequest searchRequest = new SearchRequest("kuang_index");
        // 构建搜索条件
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.highlighter();
        // 查询条件,我们可以使用 QueryBuilders 工具来实现
        // QueryBuilders.termQuery 精确
        // QueryBuilders.matchAllQuery() 匹配所有
        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("name", "qinjiang1");
        // MatchAllQueryBuilder matchAllQueryBuilder =
        QueryBuilders.matchAllQuery();
        sourceBuilder.query(termQueryBuilder);
        sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
        searchRequest.source(sourceBuilder);
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        System.out.println(JSON.toJSONString(searchResponse.getHits()));
        System.out.println("=================================");
        for (SearchHit documentFields : searchResponse.getHits().getHits()) {
            System.out.println(documentFields.getSourceAsMap());
        }
    }
}

REST high level client Javadoc(7.8)

文档接口Document API

Index API 增加文档

ElasticSearch可以直接新增数据,只要你指定了index(索引库名称)即可。在新增的时候你可以自己指定主键ID,也可以不指定,由 ElasticSearch自身生成。Elasticsearch Java High Level REST Client新增数据提供了四种种方法。

方式一:jsonString

使用IndexRequest设置JSON格式的字符串,新增,可以借助三方件将对象直接转换为JSON

// 指定索引
IndexRequest request = new IndexRequest("posts"); 
// 设置Document id
request.id("1"); 
// 构造JSON字符串,可以使用三方件如fastjson、jackson构造,如JSON.toJSONString(user)
String jsonString = "{" +
        ""user":"kimchy"," +
        ""postDate":"2013-01-30"," +
        ""message":"trying out Elasticsearch"" +
        "}";
request.source(jsonString, XContentType.JSON);

方式二:Map

通过map创建,会自动转换成JSON的数据

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
// Document source provided as a Map which gets automatically converted to JSON format
IndexRequest indexRequest = new IndexRequest("posts")
    .id("1").source(jsonMap);

方式三:XContentBuilder

可以借助XContentBuilder创建对象,会自动转换为JSON格式

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.field("user", "kimchy");
    builder.timeField("postDate", new Date());
    builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
// Document source provided as an XContentBuilder object, the Elasticsearch built-in helpers to generate JSON content
IndexRequest indexRequest = new IndexRequest("posts")
    .id("1").source(builder); 

方式四:key-pairs形式

直接使用对象键对形式构建,会自动转换为JSON格式

// Document source provided as Object key-pairs, which gets converted to JSON format
IndexRequest indexRequest = new IndexRequest("posts")
    .id("1")
    .source("user", "kimchy",
        "postDate", new Date(),
        "message", "trying out Elasticsearch");

推荐使用第2、3种方式,代码更易读。

可选参数

IndexRequest提供了以下可选参数:

// 路由参数
request.routing("routing"); 

// 以TimeValue形式设置主分片超时时间
request.timeout(TimeValue.timeValueSeconds(1)); 
// 以String形式设置主分片超时时间
request.timeout("1s");

// 使用WriteRequest.RefreshPolicy实例设置刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
// 使用String设置刷新策略
request.setRefreshPolicy("wait_for"); 

// 设置version
request.version(2); 
// 设置version type
request.versionType(VersionType.EXTERNAL); 

// 使用DocWriteRequest.OpType值设置操作类型
request.opType(DocWriteRequest.OpType.CREATE); 
// 使用String设置操作类型
request.opType("create"); 

// 请求执行前需要执行的 ingest pipeline
request.setPipeline("pipeline"); 

执行操作

分为同步和异步,listener说明同“查询接口Search API->执行查询”小节描述。注意listener泛型为IndexResponse。

// 同步
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
// 异步
client.indexAsync(request, RequestOptions.DEFAULT, listener); 

IndexResponse结果

String index = indexResponse.getIndex();
String id = indexResponse.getId();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
    // 首次创建文档的处理
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    // 已经存在的文档的更新
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 执行成功的分片数少于总分片数时,在此处处理
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure :
            shardInfo.getFailures()) {
        // 失败处理
        String reason = failure.reason(); 
    }
}

如果版本冲突,则会抛出ElasticsearchException

IndexRequest request = new IndexRequest("posts")
    .id("1")
    .source("field", "value")
    .setIfSeqNo(10L)
    .setIfPrimaryTerm(20);
try {
    IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        // 此处说明抛出了版本冲突异常
    }
}

如果opType被设置为create,但是要新增的数据已经在索引中存在相同id的文档,也会抛出上述异常。

IndexRequest request = new IndexRequest("posts")
    .id("1")
    .source("field", "value")
    .opType(DocWriteRequest.OpType.CREATE);
try {
    IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
                // 此处说明抛出了版本冲突异常
    }
}

Get API 文档查询

使用GetRequest,可以使用SearchRequest取代该API。具体参考官方文档。

Get Source API 文档source字段查询

使用GetSourceRequest,可以使用SearchRequest中的SearchSourceBuilder的fetchSource方法取代。具体用法参考官方文档。

Exists API 文档是否存在查询

使用方式同Get API,也是用GetRequest。查询的文档存在,返回true,否则返回false。

因为exists()方法只返回boolean类型,因此推荐关闭获取_source字段及所有存储的字段这样会更轻量。

// 设置请求的索引和文档ID
GetRequest getRequest = new GetRequest(
    "posts", 
    "1");    
// 关闭获取_source字段
getRequest.fetchSourceContext(new FetchSourceContext(false)); 
// 关闭获取stored fields
getRequest.storedFields("_none_");     

执行操作

// 同步
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
// 异步,listener泛型为Boolean
client.existsAsync(getRequest, RequestOptions.DEFAULT, listener); 

Delete API 删除文档

Delete Request

文档删除请求使用DeleteRequest,包括索引和文档ID两个参数

DeleteRequest request = new DeleteRequest(
        "posts",    // 索引
        "1");  // 文档ID

可选参数

// 路由参数
request.routing("routing"); 

// 以TimeValue形式设置主分片超时时间
request.timeout(TimeValue.timeValueMinutes(2)); 
// 以String形式设置主分片超时时间
request.timeout("2m");

// 使用WriteRequest.RefreshPolicy实例设置刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
// 使用String设置刷新策略
request.setRefreshPolicy("wait_for"); 

// 设置version
request.version(2); 
// 设置version type
request.versionType(VersionType.EXTERNAL); 

执行操作

// 同步
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
// 异步, ActionListener 泛型为DeleteResponse
client.deleteAsync(request, RequestOptions.DEFAULT, listener); 

DeleteResponse删除结果

返回执行删除操作的基本信息

String index = deleteResponse.getIndex();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
     // 执行成功的分片数少于总分片数时,在此处处理
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure :
            shardInfo.getFailures()) {
        // 失败处理
        String reason = failure.reason(); 
    }
}

可以从结果中获取是否找到文档

DeleteRequest request = new DeleteRequest("posts", "does_not_exist");
DeleteResponse deleteResponse = client.delete(
        request, RequestOptions.DEFAULT);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
    // 没找到文档,执行相应处理
}

如果版本冲突,则抛出异常ElasticsearchException

try {
    DeleteResponse deleteResponse = client.delete(
        new DeleteRequest("posts", "1").setIfSeqNo(100).setIfPrimaryTerm(2),
            RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        // 版本冲突异常处理
    }
}

Update API 更新文档

Update Request

更新文档使用UpdateRequest,包括索引和文档ID两个参数。

UpdateRequest request = new UpdateRequest(
        "posts",  // 索引
        "1"); // 文档ID

Update API 允许使用脚本更新或者更新部分文档信息。

Update with a script

// Script parameters provided as a Map of objects
Map<String, Object> parameters = singletonMap("count", 4); 
// Create an inline script using the painless language and the previous parameters
Script inline = new Script(ScriptType.INLINE, "painless",
        "ctx._source.field += params.count", parameters);  
// Sets the script to the update request
request.script(inline);  

或者使用stored script

// Reference to a script stored under the name increment-field in the painless language
Script stored = new Script(
        ScriptType.STORED, null, "increment-field", parameters);  
// Sets the script in the update request
request.script(stored);  

Updates with a partial document

使用该方式时,会将需要更新的部分文档与现有文档合并

方式一 JSONString形式的部分文档更新

UpdateRequest request = new UpdateRequest("posts", "1");
String jsonString = "{" +
        ""updated":"2017-01-01"," +
        ""reason":"daily update"" +
        "}";
// Partial document source provided as a String in JSON format
request.doc(jsonString, XContentType.JSON);

方式二 map形式的部分文档更新

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
// Partial document source provided as a Map which gets automatically converted to JSON format
UpdateRequest request = new UpdateRequest("posts", "1")
        .doc(jsonMap); 

方式三 XContentBuilder形式的部分文档更新

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.timeField("updated", new Date());
    builder.field("reason", "daily update");
}
builder.endObject();
// Partial document source provided as an XContentBuilder object, the Elasticsearch built-in helpers to generate JSON content
UpdateRequest request = new UpdateRequest("posts", "1")
        .doc(builder);  

方式三 key-pairs形式的部分文档更新

// Partial document source provided as Object key-pairs, which gets converted to JSON format
UpdateRequest request = new UpdateRequest("posts", "1")
        .doc("updated", new Date(),
             "reason", "daily update");

Upserts

如果需要更新的文档不存在,可以使用upsert方法插入新文档。

// Upsert document source provided as a String
String jsonString = "{"created":"2017-01-01"}";
request.upsert(jsonString, XContentType.JSON);

与上面的partial document更新一样,upsert方法也可以采用String, Map, XContentBuilder 或者Object key-pairs方式。

可选参数

// 路由参数
request.routing("routing"); 

// 以TimeValue形式设置主分片超时时间
request.timeout(TimeValue.timeValueMinutes(2)); 
// 以String形式设置主分片超时时间
request.timeout("2m");

// 使用WriteRequest.RefreshPolicy实例设置刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
// 使用String设置刷新策略
request.setRefreshPolicy("wait_for"); 

// 设置version
request.version(2); 
// 设置version type
request.versionType(VersionType.EXTERNAL); 

// 如果在执行更新时已经被其他操作修改,重新尝试的次数设置
request.retryOnConflict(3); 

// 开启获取_source字段,默认关闭
request.fetchSource(true); 

// 配置source包含的具体字段
String[] includes = new String[]{"updated", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(
        new FetchSourceContext(true, includes, excludes));

// 配置source排除的具体字段
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"updated"};
request.fetchSource(
        new FetchSourceContext(true, includes, excludes));

// ifSeqNo
request.setIfSeqNo(2L); 
// ifPrimaryTerm
request.setIfPrimaryTerm(1L); 

// 关闭noop 探测
request.detectNoop(false);

// 设置不管文档是否存在,脚本都被执行
request.scriptedUpsert(true); 

// 设置如果要更新的文档不存在,则文档变为upsert文档
request.docAsUpsert(true); 

// 设置更新操作执行前活动的分片副本数量
request.waitForActiveShards(2); 
// 可以作为活跃分区副本的数量ActiveShardCount:取值为 ActiveShardCount.ALL, ActiveShardCount.ONE 或者  ActiveShardCount.DEFAULT (默认值)
request.waitForActiveShards(ActiveShardCount.ALL);

执行操作

// 同步
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);

// 异步。listener的泛型为UpdateResponse
client.updateAsync(request, RequestOptions.DEFAULT, listener); 

UpdateResponse更新结果

UpdateResponse获取更新操作的执行情况:

String index = updateResponse.getIndex();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
    // 首次创建或upsert
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    // 文档被更新
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
    // 文档被删除
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
    // 未对已有文档造成影响
}

如果UpdateRequest允许通过fetchSource方法获取source,则UpdateResponse会返回更新文档的source信息:

// 以GetResult对象获取被更新的文档
GetResult result = updateResponse.getGetResult(); 
if (result.isExists()) {
    // 以String形式获取被更新文档的source
    String sourceAsString = result.sourceAsString(); 
    // 以Map<String, Object>形式获取被更新文档的source
    Map<String, Object> sourceAsMap = result.sourceAsMap(); 
    // 以byte[]形式获取被更新文档的source
    byte[] sourceAsBytes = result.source(); 
} else {
    // 处理响应中没有source的情形(默认行为)
}

还可以在响应中检查分片失败信息:

ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 执行成功的分片数少于总分片数时,在此处处理
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure :
            shardInfo.getFailures()) {
        // 处理失败信息
        String reason = failure.reason(); 
    }
}

如果UpdateRequest请求一个不存在的文档,会返回404,ElasticsearchException会抛出:

UpdateRequest request = new UpdateRequest("posts", "does_not_exist")
        .doc("field", "value");
try {
    UpdateResponse updateResponse = client.update(
            request, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
    if (e.status() == RestStatus.NOT_FOUND) {
        // 处理由于文档不存在导致的异常
    }
}

如果版本冲突,会抛出ElasticsearchException:

UpdateRequest request = new UpdateRequest("posts", "1")
        .doc("field", "value")
        .setIfSeqNo(101L)
        .setIfPrimaryTerm(200L);
try {
    UpdateResponse updateResponse = client.update(
            request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        // 版本冲突导致异常
    }
}

Bulk API 批量操作

实际项目中,批量操作更常用。

Java High Level REST Client提供了Bulk Processor结合BulkRequest使用。

Bulk Request

一个BulkRequest能够执行多个index/update/delete操作。

该请求至少需要有一个操作。

// 创建BulkRequest
BulkRequest request = new BulkRequest(); 
// 添加创建文档操作请求 IndexRequest
request.add(new IndexRequest("posts").id("1")  
        .source(XContentType.JSON,"field", "foo"));
// 添加第2个创建文档操作请求 IndexRequest
request.add(new IndexRequest("posts").id("2")  
        .source(XContentType.JSON,"field", "bar"));
// 添加第3个创建文档操作请求 IndexRequest
request.add(new IndexRequest("posts").id("3")  
        .source(XContentType.JSON,"field", "baz"));

注意,Bulk API 只支持JSON或SMILE编码格式,使用其他格式的文档会报错。

不同的操作可以添加到同一个BulkRequest。

BulkRequest request = new BulkRequest();
// 添加删除文档操作请求DeleteRequest
request.add(new DeleteRequest("posts", "3")); 
// 添加更新文档操作请求UpdateRequest
request.add(new UpdateRequest("posts", "2") 
        .doc(XContentType.JSON,"other", "test"));
// 添加创建文档操作请求IndexRequst,使用SMILE格式
request.add(new IndexRequest("posts").id("4")  
        .source(XContentType.JSON,"field", "baz"));

可选参数


// 以TimeValue形式设置主分片超时时间
request.timeout(TimeValue.timeValueSeconds(1)); 
// 以String形式设置主分片超时时间
request.timeout("1s");

// 使用WriteRequest.RefreshPolicy实例设置刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
// 使用String设置刷新策略
request.setRefreshPolicy("wait_for"); 

// 设置version
request.version(2); 
// 设置version type
request.versionType(VersionType.EXTERNAL); 

// 全局pipeline,适用于所有子请求,除非子请求覆写了pipeline
request.setPipeline("pipelineId"); 

// 设置index/update/delete操作执行前活动的分片副本数量
request.waitForActiveShards(2); 
// 可以作为活跃分区副本的数量ActiveShardCount:取值为 ActiveShardCount.ALL, ActiveShardCount.ONE 或者  ActiveShardCount.DEFAULT (默认值)
request.waitForActiveShards(ActiveShardCount.ALL);

// 设置全局路由,适用于所有子请求
request.routing("routingId"); 

// 全局索引,适用于所有子请求,除非子请求单独设置了索引。该参数是@Nullable,且只有在BulkRequest创建时设定。
BulkRequest defaulted = new BulkRequest("posts"); 

执行操作

// 同步
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

// 异步。listener泛型BulkResponse
client.bulkAsync(request, RequestOptions.DEFAULT, listener); 

BulkResponse批量执行结果

BulkResponse包含执行操作的信息,可以迭代获取:

// Iterate over the results of all operations
for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    // Retrieve the response of the operation (successful or not), can be IndexResponse, UpdateResponse or DeleteResponse which can all be seen as DocWriteResponse instances
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

    switch (bulkItemResponse.getOpType()) {
    case INDEX:    // Handle the response of an index operation
    case CREATE:   
        IndexResponse indexResponse = (IndexResponse) itemResponse;
        break;
    case UPDATE:   // Handle the response of a update operation
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
        break;
    case DELETE:   // Handle the response of a delete operation
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}

Bulk response提供了一个快捷检查是否有操作执行失败的方法:

// 至少有一个执行失败时,返回true
if (bulkResponse.hasFailures()) { 

}

如果有执行失败的,则需要迭代获取错误,并处理:

for (BulkItemResponse bulkItemResponse : bulkResponse) {
    // 判断操作是否失败
    if (bulkItemResponse.isFailed()) { 
        // 如果失败,则获取失败信息
        BulkItemResponse.Failure failure =
                bulkItemResponse.getFailure(); 
    }
}

Bulk Processor

BulkProcessor简化了Bulk API的使用。通过提供工具类,允许index/update/delete 操作添加到Processor中后,透明执行。

为了执行这些请求,BulkProcessor需要如下部分:

RestHighLevelClient:用于执行BulkRequest和获取结果BulkResponse

BulkProcessor.Listener:当一个BulkRequest执行失败或执行完成后调用listener

之后,BulkProcessor.builder可以用来创建一个新的BulkProcessor。

// Create the BulkProcessor.Listener
BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        // This method is called before each execution of a BulkRequest
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        // This method is called after each execution of a BulkRequest
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {
        // This method is called when a BulkRequest failed
    }
};

// Create the BulkProcessor by calling the build() method from the BulkProcessor.Builder. The RestHighLevelClient.bulkAsync() method will be used to execute the BulkRequest under the hood.
BulkProcessor bulkProcessor = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener).build();

BulkProcessor.Builder提供了配置BulkProcessor如何处理请求的方法:

BulkProcessor.Builder builder = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener);
// Set when to flush a new bulk request based on the number of actions currently added (defaults to 1000, use -1 to disable it)
builder.setBulkActions(500); 
// Set when to flush a new bulk request based on the size of actions currently added (defaults to 5Mb, use -1 to disable it)
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); 
// Set the number of concurrent requests allowed to be executed (default to 1, use 0 to only allow the execution of a single request)
builder.setConcurrentRequests(0); 
// Set a flush interval flushing any BulkRequest pending if the interval passes (defaults to not set)
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); 
// Set a constant back off policy that initially waits for 1 second and retries up to 3 times. See BackoffPolicy.noBackoff(), BackoffPolicy.constantBackoff() and BackoffPolicy.exponentialBackoff() for more options.
builder.setBackoffPolicy(BackoffPolicy
        .constantBackoff(TimeValue.timeValueSeconds(1L), 3)); 

创建完BulkProcessor后,可以向其中添加操作请求:

IndexRequest one = new IndexRequest("posts").id("1")
        .source(XContentType.JSON, "title",
                "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts").id("2")
        .source(XContentType.JSON, "title",
                "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts").id("3")
        .source(XContentType.JSON, "title",
                "The Future of Federated Search in Elasticsearch");

bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

这些请求会被BulkProcessor执行,且每个bulk 请求后会调用 BulkProcessor.Listener。

该listener提供了处理BulkRequest 和BulkResponse的途径:

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        // Called before each execution of a BulkRequest, this method allows to know the number of operations that are going to be executed within the BulkRequest
        int numberOfActions = request.numberOfActions(); 
        logger.debug("Executing bulk [{}] with {} requests",
                executionId, numberOfActions);
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        // Called after each execution of a BulkRequest, this method allows to know if the BulkResponse contains errors
        if (response.hasFailures()) { 
            logger.warn("Bulk [{}] executed with failures", executionId);
        } else {
            logger.debug("Bulk [{}] completed in {} milliseconds",
                    executionId, response.getTook().getMillis());
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {
        // Called if the BulkRequest failed, this method allows to know the failure
        logger.error("Failed to execute bulk", failure); 
    }
};

当所有请求添加到BulkProcessor后,它的实例需要使用两个可用的关闭方法的任一个关闭:

// 如果所有请求执行完成返回true,如果请求执行超时,则返回false
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); 

close()方法可以用来 立即关闭BulkProcessor:

bulkProcessor.close();

上述两个关闭方法会在关闭前刷新processor中已经添加的请求,且无法向procssor中添加新的请求。

Multi-Get API 批量获取

multiGet API可以并行执行多个Get API。

Reindex API 文档复制

ReindexRequest用于从一个或多个索引中复制文档到新的目标索引中。

Update By Query API批量更新文档

UpdateByQueryRequest

UpdateByQueryRequest用于更新一个索引中的多个文档。

一个最简单的UpdateByQueryRequest如下:

// 在一组索引上创建UpdateByQueryRequest
UpdateByQueryRequest request =  new UpdateByQueryRequest("source1", "source2"); 

默认情况下版本冲突会中断UpdateByQueryRequest的执行,但是可以使用下面的设置,只进行计数

request.setConflicts("proceed");

可以通过添加一个query限制这些文档

// 只处理user字段值为kimchy的文档
request.setQuery(new TermQueryBuilder("user", "kimchy")); 

可以通过设置maxDocs限制处理文档的最大数量

request.setMaxDocs(10); 

默认情况下,UpdateByQueryRequest一批处理1000条文档,可以通过setBatchSize修改。

request.setBatchSize(100); 

可以利用ingest 特性,指定一个pipeline

request.setPipeline("my_pipeline"); 

UpdateByQueryRequest支持使用脚本修改文档。

// setScript增加用户为kimchy的所有文档的likes字段值
request.setScript(
    new Script(
        ScriptType.INLINE, "painless",
        "if (ctx._source.user == ‘kimchy‘) {ctx._source.likes++;}",
        Collections.emptyMap()));

UpdateByQueryRequest可以通过setSlices使用sliced-scroll实现并行化。

request.setSlices(2); 

UpdateByQueryRequest使用scroll参数来控制search context的生命周期。

request.setScroll(TimeValue.timeValueMinutes(10)); 

如果提供了路由,则路由会复制到scroll query中,用以限制匹配该路由值的分片。

request.setRouting("=cat"); 

可选参数

除了上面的配置,还有一些配置参数。

// 批量更新超时时间
request.setTimeout(TimeValue.timeValueMinutes(2)); 
// 调用更新操作后刷新索引
request.setRefresh(true); 
// 设置索引选项
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); 

执行操作

// 同步
BulkByScrollResponse bulkResponse =
        client.updateByQuery(request, RequestOptions.DEFAULT);
// 异步,listener泛型为BulkByScrollResponse
client.updateByQueryAsync(request, RequestOptions.DEFAULT, listener); 

UpdateByQueryResponse 批量更新结果

UpdateByQueryResponse提供了批量更新的基本信息,可以遍历获取。

// Get total time taken
TimeValue timeTaken = bulkResponse.getTook(); 
// Check if the request timed out
boolean timedOut = bulkResponse.isTimedOut(); 
// Get total number of docs processed
long totalDocs = bulkResponse.getTotal(); 
// Number of docs that were updated
long updatedDocs = bulkResponse.getUpdated(); 
// Number of docs that were deleted
long deletedDocs = bulkResponse.getDeleted(); 
// Number of batches that were executed
long batches = bulkResponse.getBatches(); 
// Number of skipped docs
long noops = bulkResponse.getNoops(); 
// Number of version conflicts
long versionConflicts = bulkResponse.getVersionConflicts(); 
// Number of times request had to retry bulk index operations
long bulkRetries = bulkResponse.getBulkRetries(); 
// Number of times request had to retry search operations
long searchRetries = bulkResponse.getSearchRetries(); 
// The total time this request has throttled itself not including the current throttle time if it is currently sleeping
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); 
// Remaining delay of any current throttle sleep or 0 if not sleeping
TimeValue throttledUntilMillis =
        bulkResponse.getStatus().getThrottledUntil(); 
// Failures during search phase
List<ScrollableHitSource.SearchFailure> searchFailures =
        bulkResponse.getSearchFailures(); 
// Failures during bulk index operation
List<BulkItemResponse.Failure> bulkFailures =
        bulkResponse.getBulkFailures();

Delete By Query Request批量删除文档

DeleteByQueryRequest

DeleteByQueryRequest用于删除一个索引中的多个文档。需要存在一个或多个索引。

最简单的DeleteByQueryRequest如下,删除一个索引中的所有文档。

DeleteByQueryRequest request =
        new DeleteByQueryRequest("source1", "source2"); 

默认情况下版本冲突会中断DeleteByQueryRequest的执行,但是可以使用下面的设置,只进行计数

request.setConflicts("proceed");

可以通过添加一个query限制这些文档

// 只处理user字段值为kimchy的文档
request.setQuery(new TermQueryBuilder("user", "kimchy")); 

可以通过设置maxDocs限制处理文档的最大数量

request.setMaxDocs(10); 

默认情况下,DeleteByQueryRequest一批处理1000条文档,可以通过setBatchSize修改。

request.setBatchSize(100); 

DeleteByQueryRequest可以通过setSlices使用sliced-scroll实现并行化。

request.setSlices(2); 

UpdateByQueryRequest使用scroll参数来控制search context的生命周期。

request.setScroll(TimeValue.timeValueMinutes(10)); 

如果提供了路由,则路由会复制到scroll query中,用以限制匹配该路由值的分片。

request.setRouting("=cat"); 

可选参数

除了上面的配置,还有一些配置参数。

// 批量更新超时时间
request.setTimeout(TimeValue.timeValueMinutes(2)); 
// 调用更新操作后刷新索引
request.setRefresh(true); 
// 设置索引选项
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); 

执行操作

// 同步
BulkByScrollResponse bulkResponse =
        client.deleteByQuery(request, RequestOptions.DEFAULT);
// 异步, listener泛型BulkByScrollResponse
client.deleteByQueryAsync(request, RequestOptions.DEFAULT, listener); 

DeleteByQueryResponse批量删除结果

返回的DeleteByQueryResponse包含了批量删除的执行信息,可以遍历获取。

// Get total time taken
TimeValue timeTaken = bulkResponse.getTook(); 
// Check if the request timed out
boolean timedOut = bulkResponse.isTimedOut(); 
// Get total number of docs processed
long totalDocs = bulkResponse.getTotal(); 
// Number of docs that were deleted
long deletedDocs = bulkResponse.getDeleted(); 
// Number of batches that were executed
long batches = bulkResponse.getBatches(); 
// Number of skipped docs
long noops = bulkResponse.getNoops(); 
// Number of version conflicts
long versionConflicts = bulkResponse.getVersionConflicts(); 
// Number of times request had to retry bulk index operations
long bulkRetries = bulkResponse.getBulkRetries(); 
// Number of times request had to retry search operations
long searchRetries = bulkResponse.getSearchRetries(); 
// The total time this request has throttled itself not including the current throttle time if it is currently sleeping
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); 
// Remaining delay of any current throttle sleep or 0 if not sleeping
TimeValue throttledUntilMillis =
        bulkResponse.getStatus().getThrottledUntil(); 
// Failures during search phase
List<ScrollableHitSource.SearchFailure> searchFailures =
        bulkResponse.getSearchFailures(); 
// Failures during bulk index operation
List<BulkItemResponse.Failure> bulkFailures =
        bulkResponse.getBulkFailures(); 

Rethrottle API

Multi Term Vectors API

以上是关于ESJava High Level REST Client官方索引和文档操作指导的主要内容,如果未能解决你的问题,请参考以下文章

elasticsearch 7.7.0 最新版+Java High Level REST Client测试

Elasticsearch java api操作(Java High Level Rest Client)

Java High Level REST Client 使用地理位置查询

Elasticsearch High Level Rest Client 发起请求的过程分析

SpringBoot整合ElasticSearch之Java High Level REST Client

使用Java High Level REST Client操作elasticsearch