etcd 租约Watch功能分布式锁的golang实践

Posted 平安与喜乐

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了etcd 租约Watch功能分布式锁的golang实践相关的知识,希望对你有一定的参考价值。

背景

本文使用 Golang语言的SDK包 go.etcd.io/etcd/clientv3 实践etcd的租约、Watch等功能,并且实现分布式锁的业务场景。

etcd 租约

etcd过期时间可以通过设置ttl的方式, 通过租约可以控制一组key的过期时间,可以通过续租的方式保持key不过期

//etcd 租约与续约实践
func LeaseTest(env string, ttl int64) (err error) 
	cli, err := getEtcdCli(env)  //封装的clientv3客户端,不用太关心
	if err != nil 
		return
	
	lease := clientv3.NewLease(cli)    
	leaseGrant, err := lease.Grant(context.Background(), ttl)  //声明一个租约,并且设置ttl
	if err != nil  
		return
	

	if _, err = cli.Put(context.Background(), "ping", "pong", clientv3.WithLease(leaseGrant.ID)); err != nil   //设置key value 并且绑定租约
		return
	
	/*
		保持长链接,每s续租一次 
	*/
	keepRespChan, err := lease.KeepAlive(context.TODO(), leaseGrant.ID)
	if err != nil 
		fmt.Println(err)
		return
	
	go func()         
		//查看续期情况 非必需,帮助观察续租的过程
		for 
			select 
			case resp := <-keepRespChan:
				if resp == nil 
					fmt.Println("租约失效")
					return
				 else 
					fmt.Println("租约成功", resp)
				
			
		
	()

	for             //持续检测key是否过期
		values, err := cli.Get(context.Background(), "ping")
		if err != nil 
			break
		
		if values.Count == 0 
			fmt.Println("已经过期")
		 else 
                        fmt.Println("没过期", values.Kvs)
                 
		time.Sleep(time.Second * 1)
	
	return

不自动续约

把 lease.KeepAlive 去掉

ttl时间之后,租约过期后key删除

自动续约

使lease.KeepAlive生效,以及打印测试

可以看出自动续约就是没秒续约一次。

取消续约

两种形式
第一种:

lease.Revoke(context.Background(), leaseGrant.ID)

测试:

租约失效之后,租约的key会立马被删掉
第二种:

ctx, cancelFunc := context.WithCancel(context.TODO())
keepRespChan, err := lease.KeepAlive(ctx, leaseGrant.ID)
···
cancelFunc()  

测试:

租约失效之后,key的ttl到之后删除key

Watch 机制

watch机制可以使客户端监听etcd的某个key的变化,可以实现配置推送,主动下发等业务场景

//etcd 的watch功能

func WatchTest(env string) (err error) 
	ctx := context.Background()
	cli, err := getEtcdCli(env)
	if err != nil 
		return err
	
	go func() 
		for   //模拟key的变化
			cli.Put(ctx, "ping", "pong")
			cli.Delete(ctx, "ping")
			time.Sleep(time.Second)
		
	()

	pingVal, err := cli.Get(ctx, "ping")
	if err != nil || len(pingVal.Kvs) == 0 
		return err
	
	watchStartRevision := pingVal.Header.Revision + 1  //获取revision,观察这个revision之后的变化
	fmt.Println(watchStartRevision)
	watcher := clientv3.NewWatcher(cli)
	ctx, cancelFunc := context.WithCancel(context.TODO())
	time.AfterFunc(5*time.Second, func() 
		cancelFunc()
	)
	watchRespChan := watcher.Watch(ctx, "ping", clientv3.WithRev(watchStartRevision))
	for watchResp := range watchRespChan 
		for _, event := range watchResp.Events 
			switch event.Type 
			case mvccpb.PUT:
				fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
			case mvccpb.DELETE:
				fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
			
		
	
	return

可以看出Watch返回是一个chan,可以持续的监听
测试:

使用txn实现分布式锁

//锁的简单封装
type Lock struct 
	lease      clientv3.Lease
	leaseId    clientv3.LeaseID
	ctx        context.Context
	cancelFunc context.CancelFunc


func (l *Lock) Lock() (lock bool, err error) 
	cli, err := getEtcdCli("open")
	if err != nil 
		return false, err
	
	l.lease = clientv3.NewLease(cli)
	l.ctx, l.cancelFunc = context.WithCancel(context.TODO())
	leaseGrant, err := l.lease.Grant(context.TODO(), 5)
	if err != nil 
		return false, err
	
	l.leaseId = leaseGrant.ID
	kv := clientv3.NewKV(cli)
	txn := kv.Txn(l.ctx)          
	txn.If(clientv3.Compare(clientv3.CreateRevision("lock"), "=", 0)).
		Then(clientv3.OpPut("lock", "g", clientv3.WithLease(l.leaseId)))
	txnResp, err := txn.Commit()
	if err != nil 
		return false, err
	
	if !txnResp.Succeeded 
		return false, nil
	
	//自动续约
	keepRespChan, err := l.lease.KeepAlive(l.ctx, l.leaseId)
	_ = keepRespChan
	return true, nil

func (l *Lock) Unlock() 
	//l.cancelFunc()
	l.lease.Revoke(l.ctx, l.leaseId)

txn通过简单的"If-Then-Else"实现了原子操作,这里我们租期过期之后需要立刻将key删除,所以使用Revoke。
测试:

func LockTest() 
	go Node("node1", 5)
	go Node("node2", 3)
	select 


func Node(node string, t time.Duration) 
	l := Lock
	for 
		getLock, err := l.Lock()
		if err != nil || !getLock 
			continue
		
		fmt.Println("i get the lock: ", node)
		time.Sleep(time.Second * t)
		l.Unlock()
		fmt.Println("i release the lock: ", node)
		time.Sleep(time.Second)
	

本次代码:https://github.com/zhaoshoucheng/hodgepodge/tree/main/etcd

以上是关于etcd 租约Watch功能分布式锁的golang实践的主要内容,如果未能解决你的问题,请参考以下文章

golang etcd_watch_test

基于ETCD实现分布式锁&实战:控制多个应用仅一台执行任务

基于ETCD实现分布式锁&实战:控制多个应用仅一台执行任务

基于ETCD实现分布式锁&实战:控制多个应用仅一台执行任务

云原生 etcd 系列-6|租约机制

Go语言基于Etcd实现的定时任务