使用etcd实现Master的选举功能

Posted 平安与喜乐

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用etcd实现Master的选举功能相关的知识,希望对你有一定的参考价值。

背景

说起master选举,最开始想到的可能就是zookeeper,但有些场景zookeeper的使用过于繁重和复杂,又由于etcd是基于Raft的分布式K/V存储,强一致性的K/V读写是核心。
所以造就了etcd可以用于master的选举的场景。

原理

etcd clientv3 concurrency对选举进行了封装

import github.com/coreos/etcd/clientv3/concurrency

上篇文章介绍了etcd使用txn实现分布式锁,而节点选举也要依靠Txn进行创建key的CAS操作
zookeeper是利用创建临时有序节点的方式,etcd也同样的是在prefix path下创建相应的key,并且key的revision也是有序。
同样也是通过watch机制进行通知。

应用

//封装
type etcdLeader struct 
	sess *concurrency.Session    //会话 客户端的租约会话,会做KeepAlive
	elec *concurrency.Election    //选举


func Campaign(ctx context.Context, client *clientv3.Client, prefix, value string) (*etcdLeader, error) 
	if value == "" 
		value = fmt.Sprintf("%s-%d", MyHostName, timestampMs())
	
	s, err := concurrency.NewSession(client)
	if err != nil 
		return nil, fmt.Errorf("failed to generate session: %v", err)
	
	prefix = "/openresty/" + "-concurrency/" + strings.TrimPrefix(prefix, "/")

	elec := concurrency.NewElection(s, prefix)
	return &etcdLeader
		sess: s,
		elec: elec,
	, elec.Campaign(ctx, value) // blocked until elected


//elec.Campaign 会参加选举,直到它被选中、发生错误或上下文被取消

// 调用elec.Proclaim 常用来检测当前主节点是否生效
func (l *etcdLeader) Proclaim(ctx context.Context, value string) error 
	if l.elec == nil 
		return fmt.Errorf("already closed")
	
	if value == "" 
		value = fmt.Sprintf("%s-%d", MyHostName, timestampMs())
	
	return l.elec.Proclaim(ctx, value)

//调用elec.Resign 退出master
func (l *etcdLeader) Resign(ctx context.Context) error 
	if l.elec == nil 
		return nil
	
	defer l.sess.Close()
	defer func() 
		l.elec = nil
		l.sess = nil
	()
	return l.elec.Resign(ctx)

测试

//模拟进程处理
func process(processID string) 
	cli, err := getEtcdCli("open")  //获得cli
	if err != nil 
		fmt.Println(err)
		panic(processID)
	
	ctx := context.Background()
	maintainer, err := Campaign(context.Background(), cli, "process", "")  //会阻塞,直到该节点选为master
	if err != nil 
		fmt.Println("process " + processID + " Campaign err" + err.Error())
		return
	
	fmt.Println(processID + "is maintainer")
	ticker := time.NewTicker(time.Second)
	count := 0
	defer ticker.Stop()
	for 
		select 
		case <-ctx.Done():
			err = maintainer.Resign(context.Background())  //超时退出
			if err != nil 
				fmt.Printf("error occured when resign lfe_sync: %v", err)
			
			return
		case <-ticker.C:
			err = maintainer.Proclaim(ctx, "") 
			if err != nil 
				fmt.Printf("error occured when proclaim lfe_sync: %v", err)
				maintainer.Resign(ctx)
				return
			
			count++
			//do sth
			fmt.Println("processID ", processID+" ", count)
			if count == 5       //主动退出
				err = maintainer.Resign(context.Background())
				if err != nil 
					fmt.Printf("error occured when resign lfe_sync: %v", err)
				
				return
			
		
	

总结与思考

如果从Campaign源码可以看出,比起上一篇利用txn实现分布式锁的实现,选主增加watch机制,减少轮训获取锁的过程。在本场景中选主和分布式锁的使用并没有什么差别,只是选主减少轮训,性能更好。
本次代码地址:https://github.com/zhaoshoucheng/hodgepodge/blob/main/etcd/master.go

Go语言(二十)日志采集项目Etcd的使用

日志采集项目(二)Etcd的使用

ETCD 介绍

  • 概念: 高可用的分布式key-value存储,实现配置共享和服务发现
  • 类似项目: zookeeper和consul
  • 开发语言: Go
  • 接口: 提供restful的http接口,使用简单
  • 实现算法: 基于raft算法的强一致性,高可用的服务存储目录

ETCD的应用场景

  • 服务发现和注册
  • 配置中心
  • 分布式锁
  • master选举

ETCD环境搭建

[root@centos7-node1 etcd]# nohup ./etcd --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://0.0.0.0:2379 --listen-peer-urls http://0.0.0.0:2380 --initial-advertise-peer-urls http://0.0.0.0:2380 &        #启动etcd
  • etcdctl使用
[root@centos7-node1 ~]# cd /opt/application/etcd/
[root@centos7-node1 etcd]# ./etcdctl --endpoints "http://localhost:2379" put /logagent/conf 333333
[root@centos7-node1 etcd]# ./etcdctl --endpoints "http://localhost:2379" watch  /logagent/conf   
[root@centos7-node1 etcd]# ./etcdctl --endpoints "http://localhost:2379" del /logagent/conf
  • go实现watch功能
安装v3插件
go get go.etcd.io/etcd/clientv3

代码

package main

import (
   "context"
   "fmt"
   "go.etcd.io/etcd/clientv3"
   "time"
)

func main() {
   client,err := clientv3.New(clientv3.Config{
      Endpoints: []string{"192.168.56.11:2379"},
      DialTimeout: time.Second*3,
   })

   defer client.Close()
   fmt.Printf("conn succ
")
   for {
      resultChan := client.Watch(context.Background(),"/logagent/conf")
      for v := range resultChan{
         if v.Err() != nil {
            fmt.Printf("watch faild,err:%v
",err)
            continue
         }
         for _,e := range v.Events {
            fmt.Printf("event_type:%v,key:%v,val:%v
",e.Type,e.Kv.Key,string(e.Kv.Value))
         }
      }
   }
}
  • go 实现put功能
package main

import (
   "context"
   "fmt"
   "go.etcd.io/etcd/clientv3"
   "time"
)

func main() {
   client,err := clientv3.New(clientv3.Config{
      Endpoints: []string{"192.168.56.11:2379"},
      DialTimeout: time.Second*3,
   })

   defer client.Close()
   fmt.Printf("conn succ
")

   _,err = client.Put(context.Background(),"/logagent/conf","sddadas")

   if err != nil {
      fmt.Printf("Put faild,err:%v
",err)
   }
}
  • kafka消费代码
package main

import (
   "fmt"
   "github.com/Shopify/sarama"
   "sync"
)

var wg sync.WaitGroup

func main() {
   //连接配置
   consumer,err := sarama.NewConsumer([]string{"192.168.56.11:9092"},nil)
   if err != nil {
      fmt.Printf("consumer message faild,error:%v
",err)
      return
   }
   fmt.Printf("conn succ
")
   pt,err := consumer.Partitions("nginx_log")
   if err != nil {
      fmt.Printf("get partions aild,err:%v
",err)
      return
   }

   for _,p := range pt {
      pc, err := consumer.ConsumePartition("nginx_log",p,sarama.OffsetNewest)

      if err !=  nil {
         fmt.Printf("consumer faild,error:%v
",err)
         continue
      }
      wg.Add(1)
      go func() {
         for m := range pc.Messages() {
            fmt.Printf("topic:%v,value:%v
",m.Topic,string(m.Value))
         }
         wg.Done()
      }()
   }
   wg.Wait()
}

以上是关于使用etcd实现Master的选举功能的主要内容,如果未能解决你的问题,请参考以下文章

Go语言(二十)日志采集项目Etcd的使用

Go语言(二十)日志采集项目Etcd的使用

etcd 性能测试与调优

etcd 性能测试与调优

Zookeeper实现分布式选举算法

分布式键值存储系统ETCD调研