30ES集成到项目中
Posted 无休止符
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了30ES集成到项目中相关的知识,希望对你有一定的参考价值。
目录
一、go语言中使用ES
1 - 使用第三方库
- github上搜索:go elasticsearch;我们会使用第三方的,因为第三方的使用会比官方的简单一些
- olivere/elastic:https://github.com/olivere/elastic
- olivere/elastic文档地址:https://olivere.github.io/elastic/
- 简单使用
package main
import (
"encoding/json"
"fmt"
"github.com/olivere/elastic/v7"
)
func main()
//初始化一个连接
host := "http://192.168.124.51:9200"
// 这里必须将sniff设置为false,因为使用olivere/elastic连接elasticsearch时,发现连接地址命名输入的时候是地址
// 但是连接时会自动转换成内网地址或者docker中的ip地址,导致服务连接不上
_, err := elastic.NewClient(elastic.SetURL(host), elastic.SetSniff(false))
if err != nil
panic(err)
q := elastic.NewMatchQuery("address", "street")
src, err := q.Source()
if err != nil
panic(err)
data, err := json.Marshal(src)
got := string(data)
fmt.Println(got)
2 - 解析出查询结果
package main
import (
"context"
"fmt"
"github.com/olivere/elastic/v7"
)
func main()
//初始化一个连接
host := "http://192.168.124.51:9200"
// 这里必须将sniff设置为false,因为使用olivere/elastic连接elasticsearch时,发现连接地址命名输入的时候是地址
// 但是连接时会自动转换成内网地址或者docker中的ip地址,导致服务连接不上
client, err := elastic.NewClient(elastic.SetURL(host), elastic.SetSniff(false))
if err != nil
panic(err)
q := elastic.NewMatchQuery("address", "street")
result, err := client.Search().Index("user").Query(q).Do(context.Background())
if err != nil
panic(err)
total := result.Hits.TotalHits.Value
fmt.Printf("搜索结果数量:%d\\n", total)
for _, value := range result.Hits.Hits
if jsonData, err := value.Source.MarshalJSON(); err == nil
fmt.Println(string(jsonData))
else
panic(err)
3 - es对象转换为struct
- 定义struct并使用go自带的json进行转换
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/olivere/elastic/v7"
)
type Account struct
AccountNum int32 `json:"account_number"`
FirstName string `json:"firstname"`
func main()
//初始化一个连接
host := "http://192.168.124.51:9200"
// 这里必须将sniff设置为false,因为使用olivere/elastic连接elasticsearch时,发现连接地址命名输入的时候是地址
// 但是连接时会自动转换成内网地址或者docker中的ip地址,导致服务连接不上
client, err := elastic.NewClient(elastic.SetURL(host), elastic.SetSniff(false))
if err != nil
panic(err)
q := elastic.NewMatchQuery("address", "street")
result, err := client.Search().Index("user").Query(q).Do(context.Background())
if err != nil
panic(err)
total := result.Hits.TotalHits.Value
fmt.Printf("搜索结果数量:%d\\n", total)
for _, value := range result.Hits.Hits
account := Account
_ = json.Unmarshal(value.Source, &account)
fmt.Println(account)
4 - 向es中添加数据
- 添加数据同时开启es的日志打印
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/olivere/elastic/v7"
"log"
"os"
)
type Account struct
AccountNum int32 `json:"account_number"`
FirstName string `json:"firstname"`
func main()
//初始化一个连接
host := "http://192.168.124.51:9200"
logger := log.New(os.Stdout, "mxshop", log.LstdFlags)
// 这里必须将sniff设置为false,因为使用olivere/elastic连接elasticsearch时,发现连接地址命名输入的时候是地址
// 但是连接时会自动转换成内网地址或者docker中的ip地址,导致服务连接不上
client, err := elastic.NewClient(elastic.SetURL(host), elastic.SetSniff(false),
elastic.SetTraceLog(logger))
if err != nil
panic(err)
q := elastic.NewMatchQuery("address", "street")
result, err := client.Search().Index("user").Query(q).Do(context.Background())
if err != nil
panic(err)
total := result.Hits.TotalHits.Value
fmt.Printf("搜索结果数量:%d\\n", total)
for _, value := range result.Hits.Hits
account := Account
_ = json.Unmarshal(value.Source, &account)
fmt.Println(account)
account := AccountAccountNum: 15468, FirstName: "immooc bobby"
put1, err := client.Index().Index("myuser").BodyJson(account).Do(context.Background())
if err != nil
panic(err)
fmt.Printf("Indexed myuser %s to index %s, type %s \\n", put1.Id, put1.Index, put1.Type)
5 - 新建mapping
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/olivere/elastic/v7"
"log"
"os"
)
const goodsMapping = `
"mappings" :
"properties" :
"name" :
"type" : "text",
"analyzer":"ik_max_word"
,
"id" :
"type" : "integer"
`
type Account struct
AccountNum int32 `json:"account_number"`
FirstName string `json:"firstname"`
func main()
//初始化一个连接
host := "http://192.168.124.51:9200"
logger := log.New(os.Stdout, "mxshop", log.LstdFlags)
// 这里必须将sniff设置为false,因为使用olivere/elastic连接elasticsearch时,发现连接地址命名输入的时候是地址
// 但是连接时会自动转换成内网地址或者docker中的ip地址,导致服务连接不上
client, err := elastic.NewClient(elastic.SetURL(host), elastic.SetSniff(false),
elastic.SetTraceLog(logger))
if err != nil
panic(err)
q := elastic.NewMatchQuery("address", "street")
result, err := client.Search().Index("user").Query(q).Do(context.Background())
if err != nil
panic(err)
total := result.Hits.TotalHits.Value
fmt.Printf("搜索结果数量:%d\\n", total)
for _, value := range result.Hits.Hits
account := Account
_ = json.Unmarshal(value.Source, &account)
fmt.Println(account)
//account := AccountAccountNum: 15468, FirstName: "immooc bobby"
//put1, err := client.Index().Index("myuser").BodyJson(account).Do(context.Background())
//if err != nil
// panic(err)
//
//fmt.Printf("Indexed myuser %s to index %s, type %s \\n", put1.Id, put1.Index, put1.Type)
createIndex, err := client.CreateIndex("mygoods").BodyString(goodsMapping).Do(context.Background())
if err != nil
panic(err)
if !createIndex.Acknowledged
二、项目中集成ES
1 - 集成es接口分析
- 对于商品操作来说:需要同步操作es的接口
- 搜索、添加、更新、删除:这些都需要将数据同步到es的数据中
- 将es的集成在srv层还是web层:应该将es集成到srv层,因为有可能出现mysql保存成功了,但是es保存失败了,这时候如果是在web层无法完成数据的回滚,这样就省去了微服务的事务问题
- 使用es的目的:主要目的是搜索出商品的id来,通过id拿到具体的字段信息是通过mysql来完成的
- 是否需要将所有的mysql字段在es中保存一份:实际开发中,我们一般只把搜索和用来过滤的字段信息保存到es中
- es也可以来当做mysql使用:
- 但是实际上mysql和es之间是互补的关系;
- mysql是关系数据库,es是文档数据库,各有各的优缺点;
- 一般mysql用来做存储使用,es用来做搜索使用
- es的性能提升:
- 想要提升es的性能,需要将es的内存设置的够大;
- 但是实际上es可以设置的内存也是有上限的,所以我们不在es中保存不必要的字段,可以提高es的内存使用率,同样的内存可以载入的文档数量就会多一些,从另外的角度上来说也可以提高es的性能
2 - 建立商品对应的struct和mapping
- goods_srv/model/es_goods.go
package model
type EsGoods struct
ID int32 `json:"id"`
CategoryID int32 `json:"category_id"`
BrandsID int32 `json:"brands_id"`
OnSale bool `json:"on_sale"`
ShipFree bool `json:"ship_free"`
IsNew bool `json:"is_new"`
IsHot bool `json:"is_hot"`
Name string `json:"name"`
ClickNum int32 `json:"click_num"`
SoldNum int32 `json:"sold_num"`
FavNum int32 `json:"fav_num"`
MarketPrice float32 `json:"market_price"`
GoodsBrief string `json:"goods_brief"`
ShopPrice float32 `json:"shop_price"`
func (EsGoods) GetIndexName() string
return "goods"
func (EsGoods) GetMapping() string
goodsMapping := `
"mappings" :
"properties" :
"brands_id" :
"type" : "integer"
,
"category_id" :
"type" : "integer"
,
"click_num" :
"type" : "integer"
,
"fav_num" :
"type" : "integer"
,
"id" :
"type" : "integer"
,
"is_hot" :
"type" : "boolean"
,
"is_new" :
"type" : "boolean"
,
"market_price" :
"type" : "float"
,
"name" :
"type" : "text",
"analyzer":"ik_max_word"
,
"goods_brief" :
"type" : "text",
"analyzer":"ik_max_word"
,
"on_sale" :
"type" : "boolean"
,
"ship_free" :
"type" : "boolean"
,
"shop_price" :
"type" : "float"
,
"sold_num" :
"type" : "long"
`
return goodsMapping
3 - nacos新增es配置
"name": "goods_srv",
"host": "192.168.124.9",
"tags": ["imooc", "bobby", "goods", "srv"],
"mysql":
"host": "192.168.124.51",
"port": 3306,
"user": "root",
"password": "jiushi",
"db": "mxshop_goods_srv"
,
"consul":
"host": "192.168.124.51",
"port": 8500
,
"es":
"host": "192.168.124.51",
"port": 9200
- goods_srv/config/config.go:新增EsConfig结构体
package config
type MysqlConfig struct
Host string `mapstructure:"host" json:"host"`
Port int `mapstructure:"port" json:"port"`
Name string `mapstructure:"db" json:"db"`
User string `mapstructure:"user" json:"user"`
Password string `mapstructure:"password" json:"password"`
type ConsulConfig struct
Host string `mapstructure:"host" json:"host"`
Port int `mapstructure:"port" json:"port"`
type EsConfig struct
Host string `mapstructure:"host" json:"host"`
Port int `mapstructure:"port" json:"port"`
type ServerConfig struct
Name string `mapstructure:"name" json:"name"`
Host string `mapstructure:"host" json:"host"`
Tags []string `mapstructure:"tags" json:"tags"`
MysqlInfo MysqlConfig `mapstructure:"mysql" json:"mysql"`
ConsulInfo ConsulConfig `mapstructure:"consul" json:"consul"`
EsInfo EsConfig `mapstructure:"es" json:"es"`
type NacosConfig struct
Host string `mapstructure:"host"`
Port uint64 `mapstructure:"port"`
Namespace string `mapstructure:"namespace"`
User string `mapstructure:"user"`
Password string `mapstructure:"password"`
DataId string `mapstructure:"dataid"`
Group string `mapstructure:"group"`
- goods_srv/global/global.go:添加EsClient对象
package global
import (
"github.com/olivere/elastic/v7"
"gorm.io/gorm"
"nd/goods_srv/config"
)
var (
DB *gorm.DB
ServerConfig config.ServerConfig
NacosConfig config.NacosConfig
EsClient *elastic.Client
)
4 - 初始化es
- goods_srv/initialize/init_es.go
package initialize
import (
"context"
"fmt"
"github.com/olivere/elastic/v7"
"log"
"nd/goods_srv/global"
"nd/goods_srv/model"
"os"
)
func InitEs()
//初始化连接
host := fmt.Sprintf("http://%s:%d", global.ServerConfig.EsInfo.Host, global.ServerConfig.EsInfo.Port)
logger := log.New(os.Stdout, "mxshop", log.LstdFlags)
var err error
global.EsClient, err = elastic.NewClient(elastic.SetURL(host), elastic.SetSniff(false),
elastic.SetTraceLog(logger))
if err != nil
panic(err)
//新建mapping和index
exists, err := global.EsClient.IndexExists(model.EsGoods.GetIndexName()).Do(context.Background())
if err != nil
panic(err)
if !exists // 不存在的时候才需要新建mapping
_, err = global.EsClient.CreateIndex(model.EsGoods.GetIndexName()).BodyString(model.EsGoods.GetMapping()).Do(context.Background())
if err != nil
panic(err)
- goods_srv/main.go:添加es的初始化调用
func main()
IP := flag.String("ip", "0.0.0.0", "ip地址")
Port := flag.Int("port", 50058, "端口号") // 这个修改为0,如果我们从命令行带参数启动的话就不会为0
//初始化
initialize.InitLogger()
initialize.InitConfig()
initialize.InitDB()
initialize.InitEs()
zap.S().Info(global.ServerConfig)
//省略。。。
- 启动查看
5 - 同步已经的mysql数据到es中
- goods_srv/model/main/main.go
package main
import (
"context"
"fmt"
"github.com/olivere/elastic/v7"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"gorm.io/gorm/schema"
"log"
"nd/goods_srv/global"
"nd/goods_srv/initialize"
"nd/goods_srv/model"
"os"
"strconv"
"time"
)
func main()
/* initialize.InitConfig()
dsn := fmt.Sprintf("root:jiushi@tcp(%s:3306)/mxshop_goods_srv?charset=utf8mb4&parseTime=True&loc=Local", global.ServerConfig.MysqlInfo.Host)
newLogger := logger.New(
log.New(os.Stdout, "\\r\\n", log.LstdFlags), // io writer
logger.Config
SlowThreshold: time.Second, // 慢 SQL 阈值
LogLevel: logger.Info, // Log level
Colorful: true, // 禁用彩色打印
,
)
// 全局模式
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config
NamingStrategy: schema.NamingStrategy
SingularTable: true,
,
Logger: newLogger,
)
if err != nil
panic(err)
_ = db.AutoMigrate(&model.Category,
&model.Brands, &model.GoodsCategoryBrand, &model.Banner, &model.Goods)*/
Mysql2Es()
// Mysql2Es 将之前mysql的goods数据保存到es中
func Mysql2Es()
initialize.InitConfig()
dsn := fmt.Sprintf("root:jiushi@tcp(%s:3306)/mxshop_goods_srv?charset=utf8mb4&parseTime=True&loc=Local", global.ServerConfig.MysqlInfo.Host)
newLogger := logger.New(
log.New(os.Stdout, "\\r\\n", log.LstdFlags), // io writer
logger.Config
SlowThreshold以上是关于30ES集成到项目中的主要内容,如果未能解决你的问题,请参考以下文章
ElasticSearch_08_SpringBoot集成ES
ElasticSearch_07_SpringBoot集成ES
Elasticsearch 写入优化,从 3000 到 8000/s,让你的 ES 飞起来。。。