RestHighLevelClient操作ES的API
Posted Firm陈
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RestHighLevelClient操作ES的API相关的知识,希望对你有一定的参考价值。
Create Index API
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));
CreateIndexRequest request = new CreateIndexRequest("twitter_two");//创建索引
//创建的每个索引都可以有与之关联的特定设置。
request.settings(Settings.builder()
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 2)
);
//创建索引时创建文档类型映射
request.mapping("tweet",//类型定义
" \\n" +
" \\"tweet\\": \\n" +
" \\"properties\\": \\n" +
" \\"message\\": \\n" +
" \\"type\\": \\"text\\"\\n" +
" \\n" +
" \\n" +
" \\n" +
" ",//类型映射,需要的是一个JSON字符串
XContentType.JSON);
//为索引设置一个别名
request.alias(
new Alias("twitter_alias")
);
//可选参数
request.timeout(TimeValue.timeValueMinutes(2));//超时,等待所有节点被确认(使用TimeValue方式)
//request.timeout("2m");//超时,等待所有节点被确认(使用字符串方式)
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));//连接master节点的超时时间(使用TimeValue方式)
//request.masterNodeTimeout("1m");//连接master节点的超时时间(使用字符串方式)
request.waitForActiveShards(2);//在创建索引API返回响应之前等待的活动分片副本的数量,以int形式表示。
//request.waitForActiveShards(ActiveShardCount.DEFAULT);//在创建索引API返回响应之前等待的活动分片副本的数量,以ActiveShardCount形式表示。
//同步执行
CreateIndexResponse createIndexResponse = client.indices().create(request);
//异步执行
//异步执行创建索引请求需要将CreateIndexRequest实例和ActionListener实例传递给异步方法:
//CreateIndexResponse的典型监听器如下所示:
//异步方法不会阻塞并立即返回。
ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>()
@Override
public void onResponse(CreateIndexResponse createIndexResponse)
//如果执行成功,则调用onResponse方法;
@Override
public void onFailure(Exception e)
//如果失败,则调用onFailure方法。
;
client.indices().createAsync(request, listener);//要执行的CreateIndexRequest和执行完成时要使用的ActionListener
//返回的CreateIndexResponse允许检索有关执行的操作的信息,如下所示:
boolean acknowledged = createIndexResponse.isAcknowledged();//指示是否所有节点都已确认请求
boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();//指示是否在超时之前为索引中的每个分片启动了必需的分片副本数
Delete Index API
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));
DeleteIndexRequest request = new DeleteIndexRequest("twitter_two");//指定要删除的索引名称
//可选参数:
request.timeout(TimeValue.timeValueMinutes(2)); //设置超时,等待所有节点确认索引删除(使用TimeValue形式)
// request.timeout("2m"); //设置超时,等待所有节点确认索引删除(使用字符串形式)
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));连接master节点的超时时间(使用TimeValue方式)
// request.masterNodeTimeout("1m");//连接master节点的超时时间(使用字符串方式)
//设置IndicesOptions控制如何解决不可用的索引以及如何扩展通配符表达式
request.indicesOptions(IndicesOptions.lenientExpandOpen());
//同步执行
DeleteIndexResponse deleteIndexResponse = client.indices().delete(request);
/* //异步执行删除索引请求需要将DeleteIndexRequest实例和ActionListener实例传递给异步方法:
//DeleteIndexResponse的典型监听器如下所示:
//异步方法不会阻塞并立即返回。
ActionListener<DeleteIndexResponse> listener = new ActionListener<DeleteIndexResponse>()
@Override
public void onResponse(DeleteIndexResponse deleteIndexResponse)
//如果执行成功,则调用onResponse方法;
@Override
public void onFailure(Exception e)
//如果失败,则调用onFailure方法。
;
client.indices().deleteAsync(request, listener);*/
//Delete Index Response
//返回的DeleteIndexResponse允许检索有关执行的操作的信息,如下所示:
boolean acknowledged = deleteIndexResponse.isAcknowledged();//是否所有节点都已确认请求
//如果找不到索引,则会抛出ElasticsearchException:
try
request = new DeleteIndexRequest("does_not_exist");
client.indices().delete(request);
catch (ElasticsearchException exception)
if (exception.status() == RestStatus.NOT_FOUND)
//如果没有找到要删除的索引,要执行某些操作
Open Index API
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));
OpenIndexRequest request = new OpenIndexRequest("twitter");//打开索引
//可选参数:
request.timeout(TimeValue.timeValueMinutes(2)); //设置超时,等待所有节点确认索引已打开(使用TimeValue形式)
// request.timeout("2m"); //设置超时,等待所有节点确认索引已打开(使用字符串形式)
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));连接master节点的超时时间(使用TimeValue方式)
// request.masterNodeTimeout("1m");//连接master节点的超时时间(使用字符串方式)
request.waitForActiveShards(2);//在打开索引API返回响应之前等待的活动分片副本的数量,以int形式表示。
//request.waitForActiveShards(ActiveShardCount.ONE);//在打开索引API返回响应之前等待的活动分片副本的数量,以ActiveShardCount形式表示。
//设置IndicesOptions控制如何解决不可用的索引以及如何扩展通配符表达式
request.indicesOptions(IndicesOptions.strictExpandOpen());
//同步执行
OpenIndexResponse openIndexResponse = client.indices().open(request);
/*//异步执行打开索引请求需要将OpenIndexRequest实例和ActionListener实例传递给异步方法:
//OpenIndexResponse的典型监听器如下所示:
//异步方法不会阻塞并立即返回。
ActionListener<OpenIndexResponse> listener = new ActionListener<OpenIndexResponse>()
@Override
public void onResponse(OpenIndexResponse openIndexResponse)
//如果执行成功,则调用onResponse方法;
@Override
public void onFailure(Exception e)
//如果失败,则调用onFailure方法。
;
client.indices().openAsync(request, listener);*/
//Open Index Response
//返回的OpenIndexResponse允许检索有关执行的操作的信息,如下所示:
boolean acknowledged = openIndexResponse.isAcknowledged();//指示是否所有节点都已确认请求
boolean shardsAcknowledged = openIndexResponse.isShardsAcknowledged();//指示是否在超时之前为索引中的每个分片启动了必需的分片副本数
Close Index API
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));
CloseIndexRequest request = new CloseIndexRequest("index");//关闭索引
//可选参数:
request.timeout(TimeValue.timeValueMinutes(2)); //设置超时,等待所有节点确认索引已关闭(使用TimeValue形式)
// request.timeout("2m"); //设置超时,等待所有节点确认索引已关闭(使用字符串形式)
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));连接master节点的超时时间(使用TimeValue方式)
// request.masterNodeTimeout("1m");//连接master节点的超时时间(使用字符串方式)
//设置IndicesOptions控制如何解决不可用的索引以及如何扩展通配符表达式
request.indicesOptions(IndicesOptions.lenientExpandOpen());
//同步执行
CloseIndexResponse closeIndexResponse = client.indices().close(request);
/*//异步执行打开索引请求需要将CloseIndexRequest实例和ActionListener实例传递给异步方法:
//CloseIndexResponse的典型监听器如下所示:
//异步方法不会阻塞并立即返回。
ActionListener<CloseIndexResponse> listener = new ActionListener<CloseIndexResponse>()
@Override
public void onResponse(CloseIndexResponse closeIndexResponse)
//如果执行成功,则调用onResponse方法;
@Override
public void onFailure(Exception e)
//如果失败,则调用onFailure方法。
;
client.indices().closeAsync(request, listener); */
//Close Index Response
//返回的CloseIndexResponse 允许检索有关执行的操作的信息,如下所示:
boolean acknowledged = closeIndexResponse.isAcknowledged(); //指示是否所有节点都已确认请求
Single document APIs
Index API
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));
IndexRequest indexRequest1 = new IndexRequest(
"posts",//索引名称
"doc",//类型名称
"1");//文档ID
//==============================提供文档源========================================
//方式1:以字符串形式提供
String jsonString = "" +
"\\"user\\":\\"kimchy\\"," +
"\\"postDate\\":\\"2013-01-30\\"," +
"\\"message\\":\\"trying out Elasticsearch\\"" +
"";
indexRequest1.source(jsonString, XContentType.JSON);
//方式2:以Map形式提供
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
//Map会自动转换为JSON格式的文档源
IndexRequest indexRequest2 = new IndexRequest("posts", "doc", "1")
.source(jsonMap);
// 方式3:文档源以XContentBuilder对象的形式提供,Elasticsearch内部会帮我们生成JSON内容
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
builder.field("user", "kimchy");
builder.field("postDate", new Date());
builder.field("message", "trying out Elasticsearch");
builder.endObject();
IndexRequest indexRequest3 = new IndexRequest("posts", "doc", "1")
.source(builder);
//方式4:以Object key-pairs提供的文档源,它会被转换为JSON格式
IndexRequest indexRequest4 = new IndexRequest("posts", "doc", "1")
.source("user", "kimchy",
"postDate", new Date(),
"message", "trying out Elasticsearch");
//===============================可选参数start====================================
indexRequest1.routing("routing");//设置路由值
indexRequest1.parent("parent");//设置parent值
//设置超时:等待主分片变得可用的时间
indexRequest1.timeout(TimeValue.timeValueSeconds(1));//TimeValue方式
indexRequest1.timeout("1s");//字符串方式
//刷新策略
indexRequest1.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);//WriteRequest.RefreshPolicy实例方式
indexRequest1.setRefreshPolicy("wait_for");//字符串方式
indexRequest1.version(2);//设置版本
indexRequest1.versionType(VersionType.EXTERNAL);//设置版本类型
//操作类型
indexRequest1.opType(DocWriteRequest.OpType.CREATE);//DocWriteRequest.OpType方式
indexRequest1.opType("create");//字符串方式, 可以是 create 或 update (默认)
//The name of the ingest pipeline to be executed before indexing the document
indexRequest1.setPipeline("pipeline");
//===============================执行====================================
//同步执行
IndexResponse indexResponse = client.index(indexRequest1);
//异步执行
//IndexResponse 的典型监听器如下所示:
//异步方法不会阻塞并立即返回。
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>()
@Override
public void onResponse(IndexResponse indexResponse)
//执行成功时调用。 Response以参数方式提供
@Override
public void onFailure(Exception e)
//在失败的情况下调用。 引发的异常以参数方式提供
;
//异步执行索引请求需要将IndexRequest实例和ActionListener实例传递给异步方法:
client.indexAsync(indexRequest2, listener);
//Index Response
//返回的IndexResponse允许检索有关执行操作的信息,如下所示:
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED)
//处理(如果需要)第一次创建文档的情况
else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED)
//处理(如果需要)文档被重写的情况
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful())
//处理成功分片数量少于总分片数量的情况
if (shardInfo.getFailed() > 0)
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures())
String reason = failure.reason();//处理潜在的失败
//如果存在版本冲突,则会抛出ElasticsearchException:
IndexRequest request = new IndexRequest("posts", "doc", "1")
.source("field", "value")
.version(1);
try
IndexResponse response = client.index(request);
catch(ElasticsearchException e)
if (e.status() == RestStatus.CONFLICT)
//引发的异常表示返回了版本冲突错误
//如果opType设置为创建但是具有相同索引,类型和ID的文档已存在,则也会发生同样的情况:
request = new IndexRequest("posts", "doc", "1")
.source("field", "value")
.opType(DocWriteRequest.OpType.CREATE);
try
IndexResponse response = client.index(request);
catch(ElasticsearchException e)
if (e.status() == RestStatus.CONFLICT)
//引发的异常表示返回了版本冲突错误
Get API
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));
GetRequest getRequest = new GetRequest(
"posts"以上是关于RestHighLevelClient操作ES的API的主要内容,如果未能解决你的问题,请参考以下文章
Es7.x使用RestHighLevelClient进行查询操作
Es7.x使用RestHighLevelClient进行增删改和批量操作