golang对etcd的简单操作
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang对etcd的简单操作相关的知识,希望对你有一定的参考价值。
参考技术A 首先获取clientv3:连接etcd:
kv是一个用于操作kv的连接,其实它本质上是用了client的conn,为了更加专注于键值对的操作,关闭client后也会使kv无法用。(kv的操作client也能实现)
设置一个超时的context:
context.WithTimeout()会返回一个timerCtx,并在这个结构体里注入了超时时间。cancleFunc是一个取消操作的函数。put,get等操作是阻塞型操作,context里有一个用于管理超时的select,当时间一到就会隐式执行cancelFunc,使操作停止并返回错误。如果显式的调用cancelFunc()则会立即停止操作,返回错误。
put操作:
由于etcd是有序存储键值对的,还可以附加clientv3.WithFromKey(),clientv3.WithLimit()来实现分页获取的效果。
监听etcd集群键的改变:
etcd 租约Watch功能分布式锁的golang实践
背景
本文使用 Golang语言的SDK包 go.etcd.io/etcd/clientv3 实践etcd的租约、Watch等功能,并且实现分布式锁的业务场景。
etcd 租约
etcd过期时间可以通过设置ttl的方式, 通过租约可以控制一组key的过期时间,可以通过续租的方式保持key不过期
//etcd 租约与续约实践
func LeaseTest(env string, ttl int64) (err error)
cli, err := getEtcdCli(env) //封装的clientv3客户端,不用太关心
if err != nil
return
lease := clientv3.NewLease(cli)
leaseGrant, err := lease.Grant(context.Background(), ttl) //声明一个租约,并且设置ttl
if err != nil
return
if _, err = cli.Put(context.Background(), "ping", "pong", clientv3.WithLease(leaseGrant.ID)); err != nil //设置key value 并且绑定租约
return
/*
保持长链接,每s续租一次
*/
keepRespChan, err := lease.KeepAlive(context.TODO(), leaseGrant.ID)
if err != nil
fmt.Println(err)
return
go func()
//查看续期情况 非必需,帮助观察续租的过程
for
select
case resp := <-keepRespChan:
if resp == nil
fmt.Println("租约失效")
return
else
fmt.Println("租约成功", resp)
()
for //持续检测key是否过期
values, err := cli.Get(context.Background(), "ping")
if err != nil
break
if values.Count == 0
fmt.Println("已经过期")
else
fmt.Println("没过期", values.Kvs)
time.Sleep(time.Second * 1)
return
不自动续约
把 lease.KeepAlive 去掉
ttl时间之后,租约过期后key删除
自动续约
使lease.KeepAlive生效,以及打印测试
可以看出自动续约就是没秒续约一次。
取消续约
两种形式
第一种:
lease.Revoke(context.Background(), leaseGrant.ID)
测试:
租约失效之后,租约的key会立马被删掉
第二种:
ctx, cancelFunc := context.WithCancel(context.TODO())
keepRespChan, err := lease.KeepAlive(ctx, leaseGrant.ID)
···
cancelFunc()
测试:
租约失效之后,key的ttl到之后删除key
Watch 机制
watch机制可以使客户端监听etcd的某个key的变化,可以实现配置推送,主动下发等业务场景
//etcd 的watch功能
func WatchTest(env string) (err error)
ctx := context.Background()
cli, err := getEtcdCli(env)
if err != nil
return err
go func()
for //模拟key的变化
cli.Put(ctx, "ping", "pong")
cli.Delete(ctx, "ping")
time.Sleep(time.Second)
()
pingVal, err := cli.Get(ctx, "ping")
if err != nil || len(pingVal.Kvs) == 0
return err
watchStartRevision := pingVal.Header.Revision + 1 //获取revision,观察这个revision之后的变化
fmt.Println(watchStartRevision)
watcher := clientv3.NewWatcher(cli)
ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(5*time.Second, func()
cancelFunc()
)
watchRespChan := watcher.Watch(ctx, "ping", clientv3.WithRev(watchStartRevision))
for watchResp := range watchRespChan
for _, event := range watchResp.Events
switch event.Type
case mvccpb.PUT:
fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
case mvccpb.DELETE:
fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
return
可以看出Watch返回是一个chan,可以持续的监听
测试:
使用txn实现分布式锁
//锁的简单封装
type Lock struct
lease clientv3.Lease
leaseId clientv3.LeaseID
ctx context.Context
cancelFunc context.CancelFunc
func (l *Lock) Lock() (lock bool, err error)
cli, err := getEtcdCli("open")
if err != nil
return false, err
l.lease = clientv3.NewLease(cli)
l.ctx, l.cancelFunc = context.WithCancel(context.TODO())
leaseGrant, err := l.lease.Grant(context.TODO(), 5)
if err != nil
return false, err
l.leaseId = leaseGrant.ID
kv := clientv3.NewKV(cli)
txn := kv.Txn(l.ctx)
txn.If(clientv3.Compare(clientv3.CreateRevision("lock"), "=", 0)).
Then(clientv3.OpPut("lock", "g", clientv3.WithLease(l.leaseId)))
txnResp, err := txn.Commit()
if err != nil
return false, err
if !txnResp.Succeeded
return false, nil
//自动续约
keepRespChan, err := l.lease.KeepAlive(l.ctx, l.leaseId)
_ = keepRespChan
return true, nil
func (l *Lock) Unlock()
//l.cancelFunc()
l.lease.Revoke(l.ctx, l.leaseId)
txn通过简单的"If-Then-Else"实现了原子操作,这里我们租期过期之后需要立刻将key删除,所以使用Revoke。
测试:
func LockTest()
go Node("node1", 5)
go Node("node2", 3)
select
func Node(node string, t time.Duration)
l := Lock
for
getLock, err := l.Lock()
if err != nil || !getLock
continue
fmt.Println("i get the lock: ", node)
time.Sleep(time.Second * t)
l.Unlock()
fmt.Println("i release the lock: ", node)
time.Sleep(time.Second)
本次代码:https://github.com/zhaoshoucheng/hodgepodge/tree/main/etcd
以上是关于golang对etcd的简单操作的主要内容,如果未能解决你的问题,请参考以下文章