Elasticsearch:在 Java 应用中创建 mappings 及进行批量写入 - Java client 8.x
Posted Elastic 中国社区官方博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch:在 Java 应用中创建 mappings 及进行批量写入 - Java client 8.x相关的知识,希望对你有一定的参考价值。
在我之前的文章 “Elasticsearch:使用最新的 Elasticsearch Java client 8.0 来创建索引并搜索”,我详细地描述了如何在 Java 客户端应用中创建一个索引并对它进行搜索。在那个例子里,我们并没有描述如何创建 mappings。最近,我看到有开发者在评论区里留言想知道如何创建 mappings 并使用 _bulk 来进行批量写入。今天的文章,我是继先前的文章 “Elasticsearch:使用 Elasticsearch Java client 8.0 来连接带有 HTTPS 的集群” 来进行的。在 Elastic Stack 8.x 平台,开始引入 HTTPS 的访问,所以前面的那篇文章是最好的开始。
我将使用 Elastic Stack 8.2 老进行展示。为了方便大家的学习,你可以在地址 GitHub - liu-xiao-guo/ElasticsearchJava-mapping-bulk8 下载下面所讲内容的源码。关于 Java client API 的查找,你可以参考链接 Overview (java-client 8.2.2 API)
如何在 Java 应用中创建 mappings 及进行批量写入
如何在 Java 应用中创建 mappings 及进行批量写入_哔哩哔哩_bilibili
创建 mappings
其实在最新的 Java 客户端 API 中,它的使用非常像极了我们常见的在 console 中的命令。比如,我们可以使用如下的命令来创建一个索引:
PUT test
"mappings":
"properties":
"id":
"type": "keyword"
,
"name":
"type": "text",
"fields":
"keyword":
"type": "keyword",
"ignore_above": 256
,
"price":
"type": "long"
显然在上面,我们可以看见有一个叫做 mappings 的字段,当然它是操作与一个索引 test 上的。很自然的,在使用请求的时候,我们需要创建 mappings 这个定义。
首先,我们来把代码直接贴出来:
ElasticsearchIndicesClient indices = client.indices();
// Firstly remove "products" if it exists
try
DeleteIndexRequest delete_request = new DeleteIndexRequest.Builder()
.index("products")
.build();
DeleteIndexResponse delete_response = indices.delete(delete_request);
System.out.println(delete_response.acknowledged());
catch (Exception e)
// e.printStackTrace();
// Secondly remove "test" if it exists
try
DeleteIndexRequest delete_request = new DeleteIndexRequest.Builder()
.index("test")
.build();
DeleteIndexResponse delete_response = indices.delete(delete_request);
System.out.println(delete_response.acknowledged());
catch (Exception e)
// e.printStackTrace();
在上面,它们相当于如下的指令:
DELETE products
DELETE test
为了确保下面的命令成功,我们首先删除已经创建过的 products 及 test 索引,如果它们之前已经被创建过。这是一个很简单的操作,就是发送一个 DELETE 指令。
接下来,我们在一个文件中定义一个如下的 mappings:
mappings.json
"properties" :
"id" :
"type" : "keyword"
,
"name" :
"type" : "text",
"fields" :
"keyword" :
"type" : "keyword",
"ignore_above" : 256
,
"price" :
"type" : "long"
这是我们想要的一个 mapping。它定义了索引中想要的字段的类型。
我们接着使用如下的命令来创建一个叫做 test 的索引:
String mappingPath = System.getProperty("user.dir") + "/mappings.json";
JsonpMapper mapper = client._transport().jsonpMapper();
String mappings_str = new String(Files.readAllBytes(Paths.get(mappingPath)));
System.out.println("mappings are: " + mappings_str);
JsonParser parser = mapper.jsonProvider()
.createParser(new StringReader( mappings_str ));
client.indices()
.create(createIndexRequest -> createIndexRequest.index("test")
.mappings(TypeMapping._DESERIALIZER.deserialize(parser, mapper)));
首先,我们读入 mappings.json 文件,并使用 client.indices 来创建 test 索引。显然和我们的 console 中的命令很相似。从一个叫做 test 的索引中创建一个 mappings。
当我们成功运行上面的命令后,我们可以在 Kibana 中进行查看:
GET test/_mapping
它的响应为:
"test" :
"mappings" :
"properties" :
"id" :
"type" : "keyword"
,
"name" :
"type" : "text",
"fields" :
"keyword" :
"type" : "keyword",
"ignore_above" : 256
,
"price" :
"type" : "long"
接下来,我们使用另外一种方法来创建 mappings:
String mappings = "\\n" +
" \\"properties\\" : \\n" +
" \\"id\\" : \\n" +
" \\"type\\" : \\"keyword\\" \\n" +
" ,\\n"+
" \\"name\\" : \\n" +
" \\"type\\" : \\"text\\",\\n" +
" \\"fields\\" : \\n" +
" \\"keyword\\" : \\n" +
" \\"type\\" : \\"keyword\\",\\n" +
" \\"ignore_above\\" : 256 \\n" +
" \\n" +
" \\n" +
" , \\n" +
" \\"price\\" : \\n" +
" \\"type\\" : \\"long\\" \\n" +
" \\n" +
" \\n" +
"\\n";
System.out.println( "mappings are: " + mappings );
JsonpMapper mapper1 = client._transport().jsonpMapper();
JsonParser parser1 = Json.createParser(new StringReader(mappings));
CreateIndexRequest request_create = new CreateIndexRequest.Builder()
.index("products")
.mappings(TypeMapping._DESERIALIZER.deserialize(parser1, mapper1))
.build();
CreateIndexResponse response_create = indices.create(request_create);
System.out.println(response_create.acknowledged());
在上面,我使用了一个字符串 mappings 来定义索引 products 的 mappings。我们可以通过打印的方法来检查字符串的输出是否正确。我们必须确保这个字符串输出和我们在 console 里的一致性,否则会造成错误。接下来的代码和之前的一样。
当我们成功运行上面的代码后,我们可以在 Kibana 中进行查看:
GET products/_mapping
它的响应是:
"products" :
"mappings" :
"properties" :
"id" :
"type" : "keyword"
,
"name" :
"type" : "text",
"fields" :
"keyword" :
"type" : "keyword",
"ignore_above" : 256
,
"price" :
"type" : "long"
显然它是按照我们的需求来创建的 mappings。
使用 _bulk 来进行批量写入
在我们写入的时候,_bulk 指令可以一次性大量写入我们的文档,从而提高效率。那么我们该如何在 Java 里进行实现呢?
Product prod1 = new Product("prod1", "washing machine", 42);
Product prod2 = new Product("prod2", "TV", 42);
List<Product> products = new ArrayList<Product>();
products.add( prod1 );
products.add( prod2 );
BulkRequest.Builder br = new BulkRequest.Builder();
for (Product product : products)
br.operations(op -> op
.index(idx -> idx
.index("products")
.id(product.getId())
.document(product)
)
);
BulkResponse result = client.bulk(br.build());
if (result.errors())
System.out.println("Bulk had errors");
for (BulkResponseItem item: result.items())
if (item.error() != null)
System.out.println(item.error().reason());
实际上它的操作也非常简单。如上面的代码所示,我们首先创建两个文档的列表,然后使用 BulkRequest 来创建请求。在请求中,我们使用了 index 的操作。上面的这个操作非常像如下的这个命令:
POST _bulk
"index" : "_index" : "produtcs", "_id" : "prod1"
"id" : "prod1", "name": "washing machine", "price": 42
"index" : "_index" : "produtcs", "_id" : "prod2"
"id" : "prod2", "name": "TV", "price": 42
运行上面的命令后,我们执行如下的命令来进行查看:
GET products/_search?filter_path=**.hits
上面命令显示的结果为:
"hits" :
"hits" : [
"_index" : "products",
"_id" : "prod1",
"_score" : 1.0,
"_source" :
"id" : "prod1",
"name" : "washing machine",
"price" : 42
,
"_index" : "products",
"_id" : "prod2",
"_score" : 1.0,
"_source" :
"id" : "prod2",
"name" : "TV",
"price" : 42
]
显然我们的写入是成功的,并且是一次请求,同时写入了两个文档。在实际的使用中,我们可以写入成百上千的文档。
好了,进行的文章就分享到这里。希望大家学到知识!
以上是关于Elasticsearch:在 Java 应用中创建 mappings 及进行批量写入 - Java client 8.x的主要内容,如果未能解决你的问题,请参考以下文章
Elasticsearch:运用 Java 创建索引并写入数据
运行时:无法在 Docker 中创建新的操作系统线程(已经有 2 个;errno=22):Elasticsearch