Go语言实战 (17) gRPC 集成 ETCD 进行服务发现

Posted 小生凡一

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go语言实战 (17) gRPC 集成 ETCD 进行服务发现相关的知识,希望对你有一定的参考价值。

文章目录

写在前面

上一篇文章 gRPC集成ETCD进行服务注册 中,我们已经知道了如何对集合ETCD进行注册。这一次我们继续下一步,对ETCD进行服务的发现。

1. 构造发现结构体

  • 构造服务发现结构体
type Resolver struct 
	schema      string
	EtcdAddrs   []string
	DialTimeout int

	closeCh      chan struct
	watchCh      clientv3.WatchChan
	cli          *clientv3.Client
	keyPrifix    string
	srvAddrsList []resolver.Address

	cc     resolver.ClientConn
	logger *logrus.Logger

  • 创建一个基于ETCD的Resolver
func NewResolver(etcdAddrs []string, logger *logrus.Logger) *Resolver 
	return &Resolver
		schema:      schema,
		EtcdAddrs:   etcdAddrs,
		DialTimeout: 3,
		logger:      logger,
	

2. 实现机制

  • 开始进行服务发现机制
func (r *Resolver) start() (chan<- struct, error) 
	var err error
	r.cli, err = clientv3.New(clientv3.Config
		Endpoints:   r.EtcdAddrs,
		DialTimeout: time.Duration(r.DialTimeout) * time.Second,
	)
	if err != nil 
		return nil, err
	
	resolver.Register(r)

	r.closeCh = make(chan struct)

	if err = r.sync(); err != nil 
		return nil, err
	

	go r.watch()

	return r.closeCh, nil

  • watch机制的实现
func (r *Resolver) watch() 
	ticker := time.NewTicker(time.Minute)
	r.watchCh = r.cli.Watch(context.Background(), r.keyPrifix, clientv3.WithPrefix())

	for 
		select 
		case <-r.closeCh:
			return
		case res, ok := <-r.watchCh:
			if ok 
				r.update(res.Events)
			
		case <-ticker.C:
			if err := r.sync(); err != nil 
				r.logger.Error("sync failed", err)
			
		
	


  • 更新节点操作
func (r *Resolver) update(events []*clientv3.Event) 
	for _, ev := range events 
		var info Server
		var err error

		switch ev.Type 
		case clientv3.EventTypePut:
			info, err = ParseValue(ev.Kv.Value)
			if err != nil 
				continue
			
			addr := resolver.AddressAddr: info.Addr, Metadata: info.Weight
			if !Exist(r.srvAddrsList, addr) 
				r.srvAddrsList = append(r.srvAddrsList, addr)
				r.cc.UpdateState(resolver.StateAddresses: r.srvAddrsList)
			
		case clientv3.EventTypeDelete:
			info, err = SplitPath(string(ev.Kv.Key))
			if err != nil 
				continue
			
			addr := resolver.AddressAddr: info.Addr
			if s, ok := Remove(r.srvAddrsList, addr); ok 
				r.srvAddrsList = s
				r.cc.UpdateState(resolver.StateAddresses: r.srvAddrsList)
			
		
	


  • 同步获取所有地址信息
func (r *Resolver) sync() error 
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()
	res, err := r.cli.Get(ctx, r.keyPrifix, clientv3.WithPrefix())
	if err != nil 
		return err
	
	r.srvAddrsList = []resolver.Address

	for _, v := range res.Kvs 
		info, err := ParseValue(v.Value)
		if err != nil 
			continue
		
		addr := resolver.AddressAddr: info.Addr, Metadata: info.Weight
		r.srvAddrsList = append(r.srvAddrsList, addr)
	
	r.cc.UpdateState(resolver.StateAddresses: r.srvAddrsList)
	return nil


以上是关于Go语言实战 (17) gRPC 集成 ETCD 进行服务发现的主要内容,如果未能解决你的问题,请参考以下文章

Go语言实战 (15) Gin+gRPC 微服务实现备忘录 (下) | 备忘录模块

Go开发gRPC服务

ETCD:HTTP JSON API通过gRPC网关

Go语言实战 (14) Gin+gRPC 微服务实现备忘录 (上) | 用户模块

Go语言实战 gRPC 实现一个简单微服务

Go语言实战 gRPC 实现一个简单微服务