30ES集成到项目中

Posted 无休止符

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了30ES集成到项目中相关的知识,希望对你有一定的参考价值。

目录

一、go语言中使用ES

1 - 使用第三方库

package main

import (
	"encoding/json"
	"fmt"
	"github.com/olivere/elastic/v7"
)

func main() 
	//初始化一个连接
	host := "http://192.168.124.51:9200"
	// 这里必须将sniff设置为false,因为使用olivere/elastic连接elasticsearch时,发现连接地址命名输入的时候是地址
	// 但是连接时会自动转换成内网地址或者docker中的ip地址,导致服务连接不上
	_, err := elastic.NewClient(elastic.SetURL(host), elastic.SetSniff(false))
	if err != nil 
		panic(err)
	
	q := elastic.NewMatchQuery("address", "street")
	src, err := q.Source()
	if err != nil 
		panic(err)
	
	data, err := json.Marshal(src)
	got := string(data)
	fmt.Println(got)


2 - 解析出查询结果

package main

import (
	"context"
	"fmt"
	"github.com/olivere/elastic/v7"
)

func main() 
	//初始化一个连接
	host := "http://192.168.124.51:9200"
	// 这里必须将sniff设置为false,因为使用olivere/elastic连接elasticsearch时,发现连接地址命名输入的时候是地址
	// 但是连接时会自动转换成内网地址或者docker中的ip地址,导致服务连接不上
	client, err := elastic.NewClient(elastic.SetURL(host), elastic.SetSniff(false))
	if err != nil 
		panic(err)
	
	q := elastic.NewMatchQuery("address", "street")
	result, err := client.Search().Index("user").Query(q).Do(context.Background())
	if err != nil 
		panic(err)
	
	total := result.Hits.TotalHits.Value
	fmt.Printf("搜索结果数量:%d\\n", total)
	for _, value := range result.Hits.Hits 
		if jsonData, err := value.Source.MarshalJSON(); err == nil 
			fmt.Println(string(jsonData))
		 else 
			panic(err)
		
	


3 - es对象转换为struct

  • 定义struct并使用go自带的json进行转换
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/olivere/elastic/v7"
)

type Account struct 
	AccountNum int32  `json:"account_number"`
	FirstName  string `json:"firstname"`


func main() 
	//初始化一个连接
	host := "http://192.168.124.51:9200"
	// 这里必须将sniff设置为false,因为使用olivere/elastic连接elasticsearch时,发现连接地址命名输入的时候是地址
	// 但是连接时会自动转换成内网地址或者docker中的ip地址,导致服务连接不上
	client, err := elastic.NewClient(elastic.SetURL(host), elastic.SetSniff(false))
	if err != nil 
		panic(err)
	
	q := elastic.NewMatchQuery("address", "street")
	result, err := client.Search().Index("user").Query(q).Do(context.Background())
	if err != nil 
		panic(err)
	
	total := result.Hits.TotalHits.Value
	fmt.Printf("搜索结果数量:%d\\n", total)
	for _, value := range result.Hits.Hits 
		account := Account
		_ = json.Unmarshal(value.Source, &account)
		fmt.Println(account)
	


4 - 向es中添加数据

  • 添加数据同时开启es的日志打印
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/olivere/elastic/v7"
	"log"
	"os"
)

type Account struct 
	AccountNum int32  `json:"account_number"`
	FirstName  string `json:"firstname"`


func main() 
	//初始化一个连接
	host := "http://192.168.124.51:9200"
	logger := log.New(os.Stdout, "mxshop", log.LstdFlags)
	// 这里必须将sniff设置为false,因为使用olivere/elastic连接elasticsearch时,发现连接地址命名输入的时候是地址
	// 但是连接时会自动转换成内网地址或者docker中的ip地址,导致服务连接不上
	client, err := elastic.NewClient(elastic.SetURL(host), elastic.SetSniff(false),
		elastic.SetTraceLog(logger))
	if err != nil 
		panic(err)
	
	q := elastic.NewMatchQuery("address", "street")
	result, err := client.Search().Index("user").Query(q).Do(context.Background())
	if err != nil 
		panic(err)
	
	total := result.Hits.TotalHits.Value
	fmt.Printf("搜索结果数量:%d\\n", total)
	for _, value := range result.Hits.Hits 
		account := Account
		_ = json.Unmarshal(value.Source, &account)
		fmt.Println(account)
	

	account := AccountAccountNum: 15468, FirstName: "immooc bobby"
	put1, err := client.Index().Index("myuser").BodyJson(account).Do(context.Background())
	if err != nil 
		panic(err)
	
	fmt.Printf("Indexed myuser %s to index %s, type %s \\n", put1.Id, put1.Index, put1.Type)


5 - 新建mapping

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/olivere/elastic/v7"
	"log"
	"os"
)

const goodsMapping = `
	"mappings" : 
		"properties" : 
			"name" : 
				"type" : "text",
				"analyzer":"ik_max_word"
			,
			"id" : 
				"type" : "integer"
			
		
	
`

type Account struct 
	AccountNum int32  `json:"account_number"`
	FirstName  string `json:"firstname"`


func main() 
	//初始化一个连接
	host := "http://192.168.124.51:9200"
	logger := log.New(os.Stdout, "mxshop", log.LstdFlags)
	// 这里必须将sniff设置为false,因为使用olivere/elastic连接elasticsearch时,发现连接地址命名输入的时候是地址
	// 但是连接时会自动转换成内网地址或者docker中的ip地址,导致服务连接不上
	client, err := elastic.NewClient(elastic.SetURL(host), elastic.SetSniff(false),
		elastic.SetTraceLog(logger))
	if err != nil 
		panic(err)
	
	q := elastic.NewMatchQuery("address", "street")
	result, err := client.Search().Index("user").Query(q).Do(context.Background())
	if err != nil 
		panic(err)
	
	total := result.Hits.TotalHits.Value
	fmt.Printf("搜索结果数量:%d\\n", total)
	for _, value := range result.Hits.Hits 
		account := Account
		_ = json.Unmarshal(value.Source, &account)
		fmt.Println(account)
	

	//account := AccountAccountNum: 15468, FirstName: "immooc bobby"
	//put1, err := client.Index().Index("myuser").BodyJson(account).Do(context.Background())
	//if err != nil 
	//	panic(err)
	//
	//fmt.Printf("Indexed myuser %s to index %s, type %s \\n", put1.Id, put1.Index, put1.Type)

	createIndex, err := client.CreateIndex("mygoods").BodyString(goodsMapping).Do(context.Background())
	if err != nil 
		panic(err)
	
	if !createIndex.Acknowledged 

	




二、项目中集成ES

1 - 集成es接口分析

  • 对于商品操作来说:需要同步操作es的接口
    • 搜索、添加、更新、删除:这些都需要将数据同步到es的数据中
  • 将es的集成在srv层还是web层:应该将es集成到srv层,因为有可能出现mysql保存成功了,但是es保存失败了,这时候如果是在web层无法完成数据的回滚,这样就省去了微服务的事务问题
  • 使用es的目的:主要目的是搜索出商品的id来,通过id拿到具体的字段信息是通过mysql来完成的
  • 是否需要将所有的mysql字段在es中保存一份:实际开发中,我们一般只把搜索和用来过滤的字段信息保存到es中
  • es也可以来当做mysql使用
    • 但是实际上mysql和es之间是互补的关系;
    • mysql是关系数据库,es是文档数据库,各有各的优缺点;
    • 一般mysql用来做存储使用,es用来做搜索使用
  • es的性能提升
    • 想要提升es的性能,需要将es的内存设置的够大;
    • 但是实际上es可以设置的内存也是有上限的,所以我们不在es中保存不必要的字段,可以提高es的内存使用率,同样的内存可以载入的文档数量就会多一些,从另外的角度上来说也可以提高es的性能

2 - 建立商品对应的struct和mapping

  • goods_srv/model/es_goods.go
package model

type EsGoods struct 
	ID         int32 `json:"id"`
	CategoryID int32 `json:"category_id"`
	BrandsID   int32 `json:"brands_id"`
	OnSale     bool  `json:"on_sale"`
	ShipFree   bool  `json:"ship_free"`
	IsNew      bool  `json:"is_new"`
	IsHot      bool  `json:"is_hot"`

	Name        string  `json:"name"`
	ClickNum    int32   `json:"click_num"`
	SoldNum     int32   `json:"sold_num"`
	FavNum      int32   `json:"fav_num"`
	MarketPrice float32 `json:"market_price"`
	GoodsBrief  string  `json:"goods_brief"`
	ShopPrice   float32 `json:"shop_price"`


func (EsGoods) GetIndexName() string 
	return "goods"


func (EsGoods) GetMapping() string 
	goodsMapping := `
	
		"mappings" : 
			"properties" : 
				"brands_id" : 
					"type" : "integer"
				,
				"category_id" : 
					"type" : "integer"
				,
				"click_num" : 
					"type" : "integer"
				,
				"fav_num" : 
					"type" : "integer"
				,
				"id" : 
					"type" : "integer"
				,
				"is_hot" : 
					"type" : "boolean"
				,
				"is_new" : 
					"type" : "boolean"
				,
				"market_price" : 
					"type" : "float"
				,
				"name" : 
					"type" : "text",
					"analyzer":"ik_max_word"
				,
				"goods_brief" : 
					"type" : "text",
					"analyzer":"ik_max_word"
				,
				"on_sale" : 
					"type" : "boolean"
				,
				"ship_free" : 
					"type" : "boolean"
				,
				"shop_price" : 
					"type" : "float"
				,
				"sold_num" : 
					"type" : "long"
				
			
		
	`
	return goodsMapping


3 - nacos新增es配置


  "name": "goods_srv",
  "host": "192.168.124.9",
  "tags": ["imooc", "bobby", "goods", "srv"],
  "mysql": 
    "host": "192.168.124.51",
    "port": 3306,
    "user": "root",
    "password": "jiushi",
    "db": "mxshop_goods_srv"
  ,
  "consul": 
    "host": "192.168.124.51",
    "port": 8500
  ,
  "es": 
    "host": "192.168.124.51",
    "port": 9200
  

  • goods_srv/config/config.go:新增EsConfig结构体
package config

type MysqlConfig struct 
	Host     string `mapstructure:"host" json:"host"`
	Port     int    `mapstructure:"port" json:"port"`
	Name     string `mapstructure:"db" json:"db"`
	User     string `mapstructure:"user" json:"user"`
	Password string `mapstructure:"password" json:"password"`


type ConsulConfig struct 
	Host string `mapstructure:"host" json:"host"`
	Port int    `mapstructure:"port" json:"port"`


type EsConfig struct 
	Host string `mapstructure:"host" json:"host"`
	Port int    `mapstructure:"port" json:"port"`


type ServerConfig struct 
	Name       string       `mapstructure:"name" json:"name"`
	Host       string       `mapstructure:"host" json:"host"`
	Tags       []string     `mapstructure:"tags" json:"tags"`
	MysqlInfo  MysqlConfig  `mapstructure:"mysql" json:"mysql"`
	ConsulInfo ConsulConfig `mapstructure:"consul" json:"consul"`
	EsInfo     EsConfig     `mapstructure:"es" json:"es"`


type NacosConfig struct 
	Host      string `mapstructure:"host"`
	Port      uint64 `mapstructure:"port"`
	Namespace string `mapstructure:"namespace"`
	User      string `mapstructure:"user"`
	Password  string `mapstructure:"password"`
	DataId    string `mapstructure:"dataid"`
	Group     string `mapstructure:"group"`


  • goods_srv/global/global.go:添加EsClient对象
package global

import (
	"github.com/olivere/elastic/v7"
	"gorm.io/gorm"
	"nd/goods_srv/config"
)

var (
	DB           *gorm.DB
	ServerConfig config.ServerConfig
	NacosConfig  config.NacosConfig
	EsClient     *elastic.Client
)

4 - 初始化es

  • goods_srv/initialize/init_es.go
package initialize

import (
	"context"
	"fmt"
	"github.com/olivere/elastic/v7"
	"log"
	"nd/goods_srv/global"
	"nd/goods_srv/model"
	"os"
)

func InitEs() 
	//初始化连接
	host := fmt.Sprintf("http://%s:%d", global.ServerConfig.EsInfo.Host, global.ServerConfig.EsInfo.Port)
	logger := log.New(os.Stdout, "mxshop", log.LstdFlags)
	var err error
	global.EsClient, err = elastic.NewClient(elastic.SetURL(host), elastic.SetSniff(false),
		elastic.SetTraceLog(logger))
	if err != nil 
		panic(err)
	

	//新建mapping和index
	exists, err := global.EsClient.IndexExists(model.EsGoods.GetIndexName()).Do(context.Background())
	if err != nil 
		panic(err)
	
	if !exists  // 不存在的时候才需要新建mapping
		_, err = global.EsClient.CreateIndex(model.EsGoods.GetIndexName()).BodyString(model.EsGoods.GetMapping()).Do(context.Background())
		if err != nil 
			panic(err)
		
	


  • goods_srv/main.go:添加es的初始化调用
func main() 
	IP := flag.String("ip", "0.0.0.0", "ip地址")
	Port := flag.Int("port", 50058, "端口号") // 这个修改为0,如果我们从命令行带参数启动的话就不会为0

	//初始化
	initialize.InitLogger()
	initialize.InitConfig()
	initialize.InitDB()
	initialize.InitEs()
	zap.S().Info(global.ServerConfig)
	//省略。。。
  • 启动查看

5 - 同步已经的mysql数据到es中

  • goods_srv/model/main/main.go
package main

import (
	"context"
	"fmt"
	"github.com/olivere/elastic/v7"
	"gorm.io/driver/mysql"
	"gorm.io/gorm"
	"gorm.io/gorm/logger"
	"gorm.io/gorm/schema"
	"log"
	"nd/goods_srv/global"
	"nd/goods_srv/initialize"
	"nd/goods_srv/model"
	"os"
	"strconv"
	"time"
)

func main() 
	/*	initialize.InitConfig()
		dsn := fmt.Sprintf("root:jiushi@tcp(%s:3306)/mxshop_goods_srv?charset=utf8mb4&parseTime=True&loc=Local", global.ServerConfig.MysqlInfo.Host)

		newLogger := logger.New(
			log.New(os.Stdout, "\\r\\n", log.LstdFlags), // io writer
			logger.Config
				SlowThreshold: time.Second, // 慢 SQL 阈值
				LogLevel:      logger.Info, // Log level
				Colorful:      true,        // 禁用彩色打印
			,
		)

		// 全局模式
		db, err := gorm.Open(mysql.Open(dsn), &gorm.Config
			NamingStrategy: schema.NamingStrategy
				SingularTable: true,
			,
			Logger: newLogger,
		)
		if err != nil 
			panic(err)
		

		_ = db.AutoMigrate(&model.Category,
			&model.Brands, &model.GoodsCategoryBrand, &model.Banner, &model.Goods)*/
	Mysql2Es()


// Mysql2Es 将之前mysql的goods数据保存到es中
func Mysql2Es() 
	initialize.InitConfig()
	dsn := fmt.Sprintf("root:jiushi@tcp(%s:3306)/mxshop_goods_srv?charset=utf8mb4&parseTime=True&loc=Local", global.ServerConfig.MysqlInfo.Host)

	newLogger := logger.New(
		log.New(os.Stdout, "\\r\\n", log.LstdFlags), // io writer
		logger.Config
			SlowThreshold以上是关于30ES集成到项目中的主要内容,如果未能解决你的问题,请参考以下文章

ElasticSearch_08_SpringBoot集成ES

ElasticSearch_07_SpringBoot集成ES

Elasticsearch 写入优化,从 3000 到 8000/s,让你的 ES 飞起来。。。

StringBoot集成Elasticsearch

ElaticSearch 学习三(springboot 集成ES 7.6.1)

Hive 集成 ElasticSearch