Go Elasticsearch CRUD 快速入门

Posted 恋喵大鲤鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go Elasticsearch CRUD 快速入门相关的知识,希望对你有一定的参考价值。


1.简介

Elasticsearch(ES) 是一个基于 Apache Lucene 开源的分布式、高扩展、近实时的数据搜索与分析引擎,主要用于海量数据快速存储,实时检索,高效分析的场景。通过简单易用的 RESTful API,隐藏 Lucene 的复杂性,让全文搜索变得简单。

ES 功能总结有三点:

  • 分布式存储
  • 分布式搜索
  • 分布式分析

因为是分布式,可将海量数据分散到多台服务器上存储,检索和分析,只要是海量数据需要完成上面这三种操作的业务场景,一般都会考虑使用 ES,比如维基百科,Stack Overflow,GitHub 后台均有使用。
GitHub。

2.特点

ES 为什么这么受欢迎,得益于其相较于传统数据库所拥有的强大功能。

  • ES 不是什么新技术,主要是将全文检索、数据分析以及分布式技术结合在一起,形成了独一无二的 ES;
  • 数据库的功能面对很多领域是不够用的,比如全文检索,同义词处理,相关度排名,复杂数据分析,海量数据的近实时处理;ES 作为传统数据库的一个补充,提供了数据库所不不能提供的很多功能;
  • 可以作为一个大型分布式集群(数百台服务器)技术,处理 PB 级数据,服务大公司;也可以运行在单机上,服务小公司;
  • 对用户而言,开箱即用,非常简单,作为中小型的应用,分钟级部署,就可以作为生产环境的系统来使用了。

3 Kibana

说到 ES 必须要提一下 Kibana 。

ES 和 Logstash,Kibana 共同组成 ELK,ELK 是这三个开源项目的首字母缩写。ES 是一个搜索和分析引擎,Logstash 是服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到诸如 ES 等存储库中。Kibana 则可以让用户在 Elasticsearch 中使用图形和图表对数据进行可视化。

Kibana 以 Web 后台的形式提供了一个可视化操作 ES 的系统,支持根据 ES 数据绘制图表,支持 ES 查询语法自动补全等高级特性,更加方便了我们操作 ES。

4.存储结构

要想完成对 ES 的增删改查,必须先了解一下 ES 的存储结构。

大家对 mysql 的存储结构应该是很清楚的,所以咱们在学习 ES 存储结构时,同时类比 MySQL,这样理解起来会更透彻。MySQL 的数据模型由数据库、表、字段、字段类型组成,自然 ES 也有自己的一套存储结构。

ES 存储结构 与 MySQL 存储结构的对应关系。

ES存储结构MySQL存储结构
Index
Document
Field表字段
Mapping表结构定义

index

索引(index)类似 MySQL 的表,是文档(document)的集合。文档是 ES 中存储的一条数据,下面会详细介绍。

type

type 为文档类型,不过在 ES 7.0 以后的版本 已经废弃文档类型了,一个 index 中只有一个默认的 type,即 _doc。在 ES 老版本中文档类型代表一类文档的集合,index 类似 MySQL 的数据库,文档类型类似 MySQL 的表。既然 ES 新版本文档类型没什么作用了,那么 index(索引)就类似 MySQL 表的概念,ES 没有数据库的概念了。

document

ES 是面向文档的数据库,文档是 ES 存储的最基本的存储单元,文档类似 MySQL 表中的一行数据。在 ES 中,文档使用 JSON 格式存储,因此存储上要比 MySQL 灵活很多,因为 ES 支持任意格式的 JSON 数据。

{
  "_index" : "order",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "id" : 10000,
    "status" : 0,
    "total_price" : 10000,
    "create_time" : "2020-09-06 17:30:22",
    "user" : {
      "id" : 10000,
      "username" : "asong2020",
      "phone" : "888888888",
      "address" : "深圳人才区"
    }
  }
}

其中 _source 为记录的具体内容,其他字段为文档的元数据,是插入 JSON 记录时 ES 自动生成的系统字段,二者共同组成一个 document。

常用的元数据有:

  • _index:代表当前文档所属索引
  • _type:代表当前文档所属类型(ES 7.0 以后废弃了 type 用法,但是元数据还是可以看到的)
  • _id:文档唯一 ID,如果没有为文档指定 ID 则自动生成。
  • _source:文档的原始 JSON 数据
  • _version:文档的版本号,每修改一次文档,字段就会加 1,这个字段新版 ES 也给取消了
  • _seq_no:文档编号,每修改一次文档,字段就会加 1,替代老的 version。注意 seq_no 递增属于整个 index,而不是单个文档
  • _primary_term:文档所在主分区,这个可以跟 seq_no 搭配实现乐观锁并发控制,以防止旧版本的文档覆盖较新的文档

field

文档由多个 JSON 字段组成,字段跟 MySQL 中表的字段类似,常用字段类型有:

  • 数值类型(long、integer、short、byte、double、float、unsigned_long)
  • date 日期类型
  • boolean 布尔类型
  • text 文本类型,支持全文搜索
  • keyword 关键词类型,不支持全文搜索。如 phone 这种数据,用一个整体进行匹配就 ok 了,不需要进行分词处理
  • geo 这里主要用于地理信息检索、多边形区域的表达

mapping

mapping 类似于 MySQL 的表结构体定义,每个索引都有一个映射的规则,我们可以通过定义索引的映射规则,提前定义好文档的 JSON 结构和字段类型。如果没有定义索引的 mapping,ES 会在写入数据的时候,根据我们写入的数据字段推测出对应的字段类型,相当于自动定义索引的 mapping 。

注意: ES 的自动映射很方便,但是实际业务中,对于关键字段类型,通常预先定义好,这样可以避免 ES 自动生成不是你想要的字段类型。

5.其他重要概念

除了数据结构的相关概念,因 ES 是一个分布式支持水平扩展的数据库系统,必然少不了分布式相关的概念,这个最好也需要了解一下。

cluster

一个集群由一个或多个节点组成,它们共同持有数据,一起提供存储搜索功能。

集群由一个唯一的名字进行区分,默认为"elasticsearch",集群中的节点通过整个唯一的名字加入集群。

node

节点是 ES 集群的一部分,只要多个节点在同个网络中,节点就可以通过指定集群的名称加入其中,与集群中的其他节点相互感知。

和集群类似,一个节点也是由一个名字来标识的。默认情况下,这个名字是一个随机的漫威漫画角色名字,这个名字会在启动的时候赋予节点。这个名字对于管理工作来说挺重要的,因为在这个管理过程中,你会去确定网络中的哪些服务器对应于 ES 集群中的哪些节点。

shards

索引可以存储大量的数据,这些数据可能超过单个节点的硬件限制。为了解决这一问题,ES 提供细分索引的能力,即分片(shards)。

ES 可以把一个完整的索引分成多个分片,这样的好处是可以把一个大的索引拆分成多个,分布到不同的节点上,构成分布式搜索。分片的数量只能在索引创建前指定,并且索引创建后不能更改。

至于一个分片怎样分布,它的文档怎样聚合回搜索请求,完全由 ES 管理,对于作为用户来说,这些都是透明的。

replicas

在一个网络环境里,节点故障随时都可能发生,在某个分片/节点出现故障时,有一个备份机制是非常有用的。为此 ES 允许你为分片创建一份或多份拷贝,这些拷贝叫做副本(replicas)。

副本之所以重要,主要有两方面的原因:一是提高系统的容错性,当某个节点某个分片损坏或丢失时可以从副本中恢复。二是提高查询效率,ES 会自动对搜索请求进行负载均衡。

总之,每个索引可以被分成多个分片。一个索引也可以被复制 0 次(意思是没有复制)或多次。一旦复制了,每个索引就有了主分片和副分片(主分片的拷贝)。分片和复本的数量可以在索引创建的时候指定。在索引创建之后,你可以在任何时候动态地改变副本数量,但是不能改变分片的数量。

默认情况下,ES 中的每个索引被分为 5 个主分片和 1 份拷贝。不过从 7.x 版本开始,主分片由 5 改为了 1 个。如果你的集群中至少有两个节点,你的索引将会有 5 个主分片和另外 5 个副分片,这样的话每个索引总共就有 10 个分片。一个索引的多个分片可以存放在集群中的一台主机上,也可以分散存放在多台主机上,这取决于你的集群机器数量。主分片和副分片的具体位置是由 ES 内在的策略所决定的。

6.客户端库

ES 的 Go 客户端较为流行的有 Elastic 公司官方库go-elasticsearch 和第三方库 olivere/elastic,后者较为流行。

因项目中使用的是 olivere/elastic/v7,所以本文将介绍通过该库完成对 ES 的增删改查。

注意,ES 不同版本需要使用对应版本的 olivere/elastic 包,对应关系如下:

Elasticsearch versionElastic versionPackage URLRemarks
7.x7.0github.com/olivere/elastic/v7Use Go modules
6.x6.0github.com/olivere/elasticUse a dependency manager
5.x5.0gopkg.in/olivere/elastic.v5Actively maintained

本次使用 ES 的 7.x 的版本,所以使用 github.com/olivere/elastic/v7 作为客户端库,使用 go.mod 来管理依赖:

require(
	github.com/olivere/elastic/v7 v7.0.24
)

7.创建客户端

前面铺垫了这么多基础概念,下面正式开始 Go ES 的增删改查。

在开始实战之前,先介绍一下本文代码示例要实现的功能:

  • 添加用户信息
  • 更新用户信息
  • 删除用户信息
  • 根据 ID 查询单个用户
  • 根据用户信息分页查询相关用户

在进行开发之前,需要创建一个 client,用于操作 ES。这里使用单例模式来实现。

// ES 客户端
var (
	esOnce sync.Once
	esCli  *elastic.Client
)

// GetESClient 获取 ES client
func GetESClient() *elastic.Client {
	if esCli != nil {
		return esCli
	}

	esOnce.Do(func() {
		cli, err := elastic.NewSimpleClient(
			elastic.SetURL("http://test.es.db"),    // 服务地址
			elastic.SetBasicAuth("user", "secret"), // 账号密码
			elastic.SetErrorLog(log.New(os.Stderr, "", log.LstdFlags)), 	// 设置错误日志输出
			elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)),  	// 设置info日志输出
		)
		if err != nil {
			panic("new es client failed, err=" + err.Error())
		}
		esCli = cli
	})
	return esCli
}

这里创建 ES client 是使用的 NewSimpleClient() 这个方法进行实现的,当然也可以使用另外两个方法:

// NewClient creates a new client to work with Elasticsearch.
func NewClient(options ...ClientOptionFunc) (*Client, error)

// NewClientFromConfig initializes a client from a configuration
func NewClientFromConfig(cfg *config.Config) (*Client, error)

创建时可以提供 ES 连接参数。上面列举的不全,下面给大家介绍一下。

elastic.SetURL(url) 用来设置ES服务地址,如果是本地,就是127.0.0.1:9200。支持多个地址,用逗号分隔即可
elastic.SetBasicAuth("user", "secret") 这个是基于http base auth 验证机制的账号密码
elastic.SetGzip(true) 启动 gzip 压缩
elastic.SetHealthcheckInterval(10*time.Second) 用来设置监控检查时间间隔
elastic.SetMaxRetries(5) 设置请求失败最大重试次数,v7 版本以后已被弃用
elastic.SetSniff(false) 设置是否定期检查集群(默认为true)
elastic.SetErrorLog(log.New(os.Stderr, " ", log.LstdFlags)) 设置错误日志输出
elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)) 设置info日志输出

8.创建 index

上一步,我们创建了 client,接下来我们创建 index 和对应的 mapping。mapping 用于描述 document 的 JSON 结构和字段类型。

设计一个 mapping 用来描述我们要存储的用户信息,mapping 也用一个 JSON 串来表示。

ESMapping = `{
	"mappings":{
		"dynamic": "strict",
		"properties":{
			"id": 				{ "type": "long" },
			"username": 		{ "type": "keyword" },
			"nickname":			{ "type": "text" },
			"phone":			{ "type": "keyword" },
			"age":				{ "type": "long" },
			"ancestral":		{ "type": "text" },
			"identity":         { "type": "text" },
			"update_time":		{ "type": "long" },
			"create_time":		{ "type": "long" }
			}
		}
	}`

一般的,mapping 可以分为动态映射(dynamic mapping)和静态(显式)映射(explicit mapping)和精确(严格)映射(strict mappings),具体由 dynamic 属性控制。

其中 "dynamic": "strict" 表示字段需要严格匹配,新增或类型不一致写入将会报错。

索引名称定义为:index = es_index_userinfo

设计好了 index 及 mapping 后,我们开始编写代码进行创建:

// ESIndexExists 索引是否存在
func ESIndexExists(ctx context.Context, index string) (bool, error) {
	return GetESClient().IndexExists(index).Do(ctx)
}

// CrtESIndex 创建 ES 索引
func CrtESIndex(ctx context.Context, index, mapping string) error {
	exist, err := ESIndexExists(ctx, index)
	if err != nil {
		return err
	}
	// 已经创建
	if exist {
		return nil
	}
	// 重复创建会报错
	_, err = GetESClient().CreateIndex(index).BodyString(mapping).Do(ctx)
	return err
}

因为重复创建 index,ES 会报错,所以创建前先判断一下是否已经创建。

创建成功后,我们在 Kibana 上通过 Restful API 可以查看到刚刚创建的 index。

GET /es_index_userinfo


其中 number_of_shards 为主分片数,缺省为 1,只能在创建索引时指定,后期无法修改。number_of_replicas 是指每个分片有多少个副本,后期可以动态修改。

对应的 RESTful API 为:

PUT /es_index_userinfo
{
  "mappings":{
		"dynamic": "strict",
		"properties":{
			"id": 				{ "type": "long" },
			"username": 		{ "type": "keyword" },
			"nickname":			{ "type": "text" },
			"phone":			{ "type": "keyword" },
			"age":				{ "type": "long" },
			"ancestral":		{ "type": "text" },
			"identity":         { "type": "text" },
			"update_time":		{ "type": "long" },
			"create_time":		{ "type": "long" }
		}
	}
}

9.增加

创建完 index,便可以向 index 中添加记录。

// Create2ES 添加记录到 ES
func Create2ES(ctx context.Context, index, id, json string) error {
	_, err := GetESClient().Index().Index(index).OpType("create").Id(id).BodyJson(json).Refresh("wait_for").Do(ctx)
	return err
}

index 为索引,id 唯一标识一条记录,也就是 document,json 为 JSON 格式的数据,即 document 原始 JSON 数据。wait_for 表示等待发生刷盘,这样写入的数据才可以搜索到。ES 自动刷新发生变更的每个索引的分片,刷新间隔为index.refresh_interval,缺省为 1 秒。

关于 refresh 的取值官方说明如下:

Empty string or true
Refresh the relevant primary and replica shards (not the whole index) immediately after the operation occurs, so that the updated document appears in search results immediately. This should ONLY be done after careful thought and verification that it does not lead to poor performance, both from an indexing and a search standpoint.

wait_for
Wait for the changes made by the request to be made visible by a refresh before replying. This doesn’t force an immediate refresh, rather, it waits for a refresh to happen. Elasticsearch automatically refreshes shards that have changed every index.refresh_interval which defaults to one second. That setting is dynamic. Calling the Refresh API or setting refresh to true on any of the APIs that support it will also cause a refresh, in turn causing already running requests with refresh=wait_for to return.

false (the default)
Take no refresh related actions. The changes made by this request will be made visible at some point after the request returns.

注意:重复创建会报elastic: Error 409 (Conflict)错误。

写入成功后,通过 RESTful API 在 Kibana 查看到刚刚写入的文档。

GET /es_index_userinfo/_doc/1


_doc 为文档类型,1 为文档 ID。

当然,我们也可以通过 RESTful API 写入文档。

PUT /es_index_userinfo/_doc/1
{
	"id":	1,
	"username":	"alice",
	"nickname": "cat",
	"phone":18819994334,
	"age":	18,
	"ancestral": "安徽",
	"identity":"12345678",
	"update_time":1627522828,
	"create_time":1627522828
}

10.删除

可以根据文档 ID 删除对应的文档。

// Delete4ES 通过 ID 删除文档
func Delete4ES(ctx context.Context, index, id string) error {
	_, err := GetESClient().Delete().Index(index).Id(id).Refresh("wait_for").Do(ctx)
	return err
}

注意:重复删除会报elastic: Error 404 (Not Found)错。

对应 RESTful API 为:

DELETE /es_index_userinfo/_doc/1?refresh=true

我们也可以根据条件来删除符合条件的文档,即 Delete by Query。

// DeleteByQuery4ES 根据条件删除文档
// param: index 索引; query 条件
// ret: 删除的文档数; error
func DeleteByQuery4ES(ctx context.Context, index string, query elastic.Query) (int64, error) {
	rsp, err := GetESClient().DeleteByQuery(index).Query(query).Refresh("true").Do(ctx)
	if err != nil {
		return 0, err
	}
	return rsp.Deleted, nil
}

注意:Refresh 只能指定 true 或 false(缺省值),不能指定 wait_for。

比如我们删除昵称为 cat 且年龄小于等于18 的用户。

query := elastic.NewBoolQuery()
query.Filter(elastic.NewTermQuery("nickname", "cat"))
query.Filter(elastic.NewRangeQuery("age").Lte(18))
ret, err := DeleteByQuery4ES(context.Background(), index, query)

对应的 RESTful api 为:

POST /es_index_userinfo/_delete_by_query?refresh=true
{
  "query":{
     "bool":{
       "filter":[
			{"term" : {"nickname" : "cat"}},
			{"range" : {"age" : {"lte" : 18}}}
		]
     }
  }
}

11.修改

可以根据文档 ID 更新对应的文档。

// Update2ES 修改文档
// param: index 索引; id 文档ID; m 待更新的字段键值结构
func Update2ES(ctx context.Context, index, id string, m map[string]interface{}) error {
	_, err := GetESClient().Update().Index(index).Id(id).Doc(m).Refresh("wait_for").Do(ctx)
	return err
}

注意:修改不存在的文档将报elastic: Error 404 (Not Found)错误。

比如修改文档 ID 为 1 的用户名改为 “jack”。

err := Update2ES(context.Background(), index, "1", map[string]interface{}{"username": "jack"})

对应的 RESTful api 为:

POST /es_index_userinfo/_update/1?refresh=wait_for
{
   "doc": {
    "username": "jack"
  }
}

我们也可以根据条件来更新符合条件的文档,即 Update by Query。

// UpdateByQuery2ES 根据条件修改文档
// param: index 索引; query 条件; script 脚本指定待更新的字段与值
func UpdateByQuery2ES(ctx context.Context, index string, query elastic.Query, script *elastic.Script) (int64, error) {
	rsp, err := GetESClient().UpdateByQuery(index).Query(query).Script(script).Refresh("true").Do(ctx)
	if err != nil {
		return 0, err
	}
	return rsp.Updated, nil
}

注意:Refresh 只能指定 true 或 false(缺省值),不能指定 wait_for。

比如我将更新用户名为 alice,年龄小于等于 18 岁的用户昵称和祖籍。

query := elastic.NewBoolQuery()
query.Filter(elastic.NewTermQuery("username", "alice"))
query.Filter(elastic.NewRangeQuery("age").Lte(18))
script := elastic.NewScriptInline("ctx._source.nickname=params.nickname;ctx._source.ancestral=params.ancestral").Params(
	map[string]interface{}{
		"nickname":  "cat",
		"ancestral": "安徽",
	})
ret, err := UpdateByQuery2ES(context.Background(), index, query, script)

对应的 RESTful api 为:

POST /es_index_userinfo/_update_by_query?refresh=true
{
  "query":{
     "bool":{
       "filter":[
         {"term":{"username":"alice"}},
         {"range" : {"age" : {"lte" : 18}}}
         ]
     }
  },
  "script": {
    "source": "ctx._source['nickname'] = 'cat';ctx._source['ancestral'] ='安徽'"
  }
}

12.查询

搜索是 ES 最为复杂精妙的地方,这里只示例项目中较为常用的查询。

ES 中的条件查询常用的有如下几种:

  • TermQuery 精确匹配单个字段
  • TermsQuery 精确匹配单个字段,但使用多值进行匹配,类似于 SQL 中的 in 操作
  • MatchQuery 单个字段全文搜索(匹配分词结果,不需要全文匹配)
  • RangeQuery 范围查询
  • BoolQuery 组合查询

12.1 根据 ID 查询

根据文档ID获取单个文档信息。

// GetByID4ES 根据ID查询单个文档
func GetByID4ES(ctx context.Context, index, id string) (string, error) {
	res, err := GetESClient().Get().Index(index).Id(以上是关于Go Elasticsearch CRUD 快速入门的主要内容,如果未能解决你的问题,请参考以下文章

Go Elasticsearch index CRUD

golang elasticsearch 文档操作(CRUD) --- 2022-04-02

SpringBoot+ElasticSearch 实现模糊查询,批量CRUD,排序,分页,高亮

SpringBoot ElasticSearch 实现模糊查询,批量CRUD,排序,分页,高亮

SpringBoot集成ElasticSearch,实现模糊查询,批量CRUD,排序,分页,高亮

第一个CRUD的制作方法(Ruby on Rails 開發秘籍 | Ruby on Rails 快速入門)