如何把数据快速批量添加到Elasticsearch中

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何把数据快速批量添加到Elasticsearch中相关的知识,希望对你有一定的参考价值。

参考技术A 这个你去使用《超强点击猫》软件就行了,因为超强点击猫可以模拟各类批量动作,批量导入文本数据,使用很方便的!

Go Elasticsearch 增加快速入门



创建完 ES 的 index,便可以向 index 中添加记录。

1.创建单个文档

1.1 Golang

借助 IndexService 创建指定 ID 的文档。

// Create2ES 添加记录到 ES
func Create2ES(ctx context.Context, index, id, json string) error {
	_, err := GetESClient().Index().Index(index).OpType("create").Id(id).BodyJson(json).Refresh("true").Do(ctx)
	return err
}

index 为索引,id 唯一标识一条记录,也就是 document,json 为 JSON 格式的数据,即 document 原始 JSON 数据。wait_for 表示等待发生刷盘,这样写入的数据才可以搜索到。ES 自动刷新发生变更的每个索引的分片,刷新间隔为index.refresh_interval,缺省为 1 秒。

关于 refresh 的取值官方说明如下:

Empty string or true
Refresh the relevant primary and replica shards (not the whole index) immediately after the operation occurs, so that the updated document appears in search results immediately. This should ONLY be done after careful thought and verification that it does not lead to poor performance, both from an indexing and a search standpoint.

wait_for
Wait for the changes made by the request to be made visible by a refresh before replying. This doesn’t force an immediate refresh, rather, it waits for a refresh to happen. Elasticsearch automatically refreshes shards that have changed every index.refresh_interval which defaults to one second. That setting is dynamic. Calling the Refresh API or setting refresh to true on any of the APIs that support it will also cause a refresh, in turn causing already running requests with refresh=wait_for to return.

false (the default)
Take no refresh related actions. The changes made by this request will be made visible at some point after the request returns.

注意:重复创建会报elastic: Error 409 (Conflict)错误。

写入成功后,通过 RESTful API 在 Kibana 查看到刚刚写入的文档。

GET /es_index_userinfo/_doc/1


_doc 为文档类型,1 为文档 ID。

1.2 RESTful API

当然,我们也可以通过 RESTful API 写入文档。

PUT /es_index_userinfo/_doc/1
{
	"id":	1,
	"username":	"alice",
	"nickname": "cat",
	"phone":18819994334,
	"age":	18,
	"ancestral": "安徽",
	"identity":"12345678",
	"update_time":1627522828,
	"create_time":1627522828
}

2.批量创建文档

2.1 Golang

借助 BulkService 创建指定 ID 的文档。

// BulkCreate2ES 批量添加文档(允许部分失败)
func BulkCreate2ES(ctx context.Context, index string, m map[string]string) error {
	bulkService := GetESClient().Bulk().Index(index).Refresh("true")

	// 添加多个文档请求
	for id, doc := range m {
		bulkService.Add(elastic.NewBulkCreateRequest().
			Index(index).
			Id(id).
			Doc(doc))
	}
	res, err := bulkService.Do(ctx)
	if err != nil {
		return err
	}
	if len(res.Items) != len(m) {
		return errors.New("rsp item number not equal to req doc number")
	}
	return nil
}

使用示例:

m := map[string]string{
		"6": `{"id":6,"username":"ada","age":20}`,
		"7": `{"id":7,"username":"angela","age":21}`,
}
_ = BulkCreate2ES(ctx, "es_index_userinfo", m)

注意: 文档创建成功与否,会在回包中的 BulkResponseItem 注明,比如上面的 rsp 内容为:

{
        "took": 12,
        "errors": true,
        "items": [
                {
                        "create": {
                                "_index": "es_index_userinfo",
                                "_type": "_doc",
                                "_id": "6",
                                "status": 409,
                                "error": {
                                        "type": "version_conflict_engine_exception",
                                        "reason": "[6]: version conflict, document already exists (current version [1])",
                                        "index": "es_index_userinfo"
                                }
                        }
                },
                {
                        "create": {
                                "_index": "es_index_userinfo",
                                "_type": "_doc",
                                "_id": "7",
                                "_version": 1,
                                "result": "created",
                                "_shards": {
                                        "total": 2,
                                        "successful": 2,
                                        "failed": 0
                                },
                                "_seq_no": 25,
                                "_primary_term": 1,
                                "status": 201,
                                "forced_refresh": true
                        }
                }
        ]
}

其中"errors": true表示有错误,状态码 status 为 2XX 表示成功,状态码非 2XX 表示失败,并会附上详细的错误信息。

2.1 RESTful API

对应的 RESTful API 为:

POST es_index_userinfo/_bulk
{ "create" : {"_id":7}}
{"id":7, "username":"ada","age":20}
{ "create" : {"_id":8}}
{"id":8,"username":"angela","age":21}

这里简单介绍 _bulk 语法,每一个操作要两个 json 串,语法如下:

POST _bulk
{"action": {"metadata"}}
{"data"}

# 或
POST index/_bulk
{"action": {"metadata"}}
{"data"}

index 可以在 URL 中,也可以放入 metadata 中。每个 json 串独占一行,其中 action 可取值:

  • index 创建或替换指定文档
  • create 创建
  • delete 删除
  • update 更新指定字段

下面是批量操作的示例:

POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

参考文献

github/elastic/elasticsearch
github/olivere/elastic/v7
Elasticsearch Guide [7.15] » REST APIs » Document APIs » Bulk API

以上是关于如何把数据快速批量添加到Elasticsearch中的主要内容,如果未能解决你的问题,请参考以下文章

ElasticSearch高级操作

mysql批量插入数据

Elasticsearch 添加数据

Go Elasticsearch 增加快速入门

Go Elasticsearch 增加快速入门

如何使用NEST Bulk Api将文档添加到elasticsearch