从0实现分布式任务调度系统--etcd基础操作
Posted 尚书左仆射
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0实现分布式任务调度系统--etcd基础操作相关的知识,希望对你有一定的参考价值。
- 下载: etcd-v3.3.18-linux-amd64.tar.gz ;解压: tar -zxvf etcd-v3.3.18-linux-amd64.tar.gz
解压后的情况如下:
其中etcd是服务端程序,etcdctl是客户端程序。
2. 启动etcd服务(后台启动,并监听公网IP,默认只监听localhost):
nohup ./etcd --listen-client-urls ‘http://0.0.0.0:2379’ --advertise-client-urls ‘http://0.0.0.0:2379’ &
使用less nohup.out 命令可以查看nohup.out 中的日志输出。
3. 设置环境变量,将etcd的接口版本切换到V3
export ETCDCTL_API=3
4. 下载etcd go语言客户端代码:
https://github.com/etcd-io/etcd
使用go get 下载速度很慢,可以去golang中国下载相应的包。
将GitHub中项目代码对应的路径复制到“导入路径”这一栏目中(不能有HTTP前缀,不然会报错)。
5. 调整etcdclientv3的目录,导入时如下所示:
GoWorks\\src\\go.etcd.io\\etcd (etcd为下载后解压的代码)
完成前面5步后,就可以开始使用etcd 的go语言接口进行编程操作了。
获取指定目录下的内容:
func main()
var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
//putResp *clientv3.PutResponse
getResp *clientv3.GetResponse
)
config = clientv3.Config
Endpoints:[]string"11.32.225.63:2379", // 集群列表
DialTimeout:5 * time.Second,
// 建立一个客户端
if client,err = clientv3.New(config); err != nil
fmt.Println(err)
return
defer client.Close()
// 读写etcd的键值对
kv = clientv3.NewKV(client)
// 读取/cron/jobs/为前缀的所有key
if getResp, err = kv.Get(context.TODO(), "/corn/jobs/", clientv3.WithPrefix()); err != nil
fmt.Println(err)
else
// 获取成功,遍历所有的kvs
fmt.Println(getResp.Kvs)
clientv3.KV下的操作包括最基础的Get/Put/Delete,分别用于获取/存入/删除一个键值对。
租约和事务操作:
func main()
var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
lease clientv3.Lease
leaseGrantResp *clientv3.LeaseGrantResponse
leaseID clientv3.LeaseID
keepResp *clientv3.LeaseKeepAliveResponse
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
ctx context.Context
cancelFunc context.CancelFunc
txn clientv3.Txn
txnResp *clientv3.TxnResponse
)
config = clientv3.Config
Endpoints:[]string"11.32.225.63:2379", // 集群列表
DialTimeout:5 * time.Second,
// 建立一个客户端
if client,err = clientv3.New(config); err != nil
fmt.Println(err)
return
defer client.Close()
//lease 实现锁自动过期
//op操作
//txn事务: if else then
// 申请一个租约lease
lease = clientv3.NewLease(client)
// 申请一个5s的租约
if leaseGrantResp, err = lease.Grant(context.TODO(), 5);err != nil
fmt.Println(err)
return
// 拿到租约的ID
leaseID = leaseGrantResp.ID
// 准备一个用于取消自动续租的context
ctx, cancelFunc = context.WithCancel(context.TODO())
// 确保函数退出后,自动续租会停止
defer cancelFunc()
defer lease.Revoke(context.TODO(), leaseID)
// 自动续约,5s后取消自动续约
if keepRespChan,err = lease.KeepAlive(ctx, leaseID);err != nil
fmt.Println(err)
return
// 处理续约的应答协程
go func()
for
select
case keepResp = <-keepRespChan:
if keepResp == nil
fmt.Println("租约已经失效了")
goto END
else
fmt.Println("收到自动续约应答:", keepResp.ID)
END:
()
// if 不存在key, then设置它,else抢锁失败
kv = clientv3.NewKV(client)
// 1 创建事务
txn = kv.Txn(context.TODO())
//定义事务
// 如果key不存在
txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
Then(clientv3.OpPut("/cron/lock/job9", "XXX", clientv3.WithLease(leaseID))).
Else(clientv3.OpGet("/cron/lock/job9")) // 抢锁失败
// 提交事务
if txnResp,err = txn.Commit();err!=nil
fmt.Println(err)
return
// 判断是否抢到锁
if !txnResp.Succeeded
fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
// 2 处理业务
fmt.Println("处理任务")
time.Sleep(5 * time.Second)
// 3 释放锁(取消自动续租,释放租约)
// defer会把租约释放掉,关联的KV就会被删除
接口函数有很多,不能一一举例了,自己试着调用一下,感觉学起来也还是挺快的。
以上是关于从0实现分布式任务调度系统--etcd基础操作的主要内容,如果未能解决你的问题,请参考以下文章