从0实现分布式任务调度系统--etcd基础操作

Posted 尚书左仆射

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0实现分布式任务调度系统--etcd基础操作相关的知识,希望对你有一定的参考价值。

  1. 下载: etcd-v3.3.18-linux-amd64.tar.gz  ;解压: tar -zxvf etcd-v3.3.18-linux-amd64.tar.gz

解压后的情况如下:

其中etcd是服务端程序,etcdctl是客户端程序。

 

    2. 启动etcd服务(后台启动,并监听公网IP,默认只监听localhost):

nohup ./etcd --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://0.0.0.0:2379  &

使用less nohup.out 命令可以查看nohup.out 中的日志输出。

 

    3. 设置环境变量,将etcd的接口版本切换到V3

export ETCDCTL_API=3

 

    4. 下载etcd  go语言客户端代码:

https://github.com/etcd-io/etcd

使用go get 下载速度很慢,可以去golang中国下载相应的包。

将GitHub中项目代码对应的路径复制到“导入路径”这一栏目中(不能有HTTP前缀,不然会报错)。

    5. 调整etcdclientv3的目录,导入时如下所示:

GoWorks\\src\\go.etcd.io\\etcd  (etcd为下载后解压的代码)

完成前面5步后,就可以开始使用etcd 的go语言接口进行编程操作了。

 

获取指定目录下的内容:

func main() 
   var (
      config clientv3.Config
      client *clientv3.Client
      err error
      kv clientv3.KV
      //putResp *clientv3.PutResponse
      getResp *clientv3.GetResponse
   )

   config = clientv3.Config
      Endpoints:[]string"11.32.225.63:2379",  // 集群列表
      DialTimeout:5 * time.Second,
   

   // 建立一个客户端
   if client,err = clientv3.New(config); err != nil
      fmt.Println(err)
      return
   
   defer client.Close()

   // 读写etcd的键值对
   kv = clientv3.NewKV(client)

   // 读取/cron/jobs/为前缀的所有key
   if getResp, err = kv.Get(context.TODO(), "/corn/jobs/", clientv3.WithPrefix()); err != nil
      fmt.Println(err)
   else
      // 获取成功,遍历所有的kvs
      fmt.Println(getResp.Kvs)
   

clientv3.KV下的操作包括最基础的Get/Put/Delete,分别用于获取/存入/删除一个键值对。

 

租约和事务操作:

func main() 

   var (
      config clientv3.Config
      client *clientv3.Client
      err error
      kv clientv3.KV
      lease clientv3.Lease
      leaseGrantResp *clientv3.LeaseGrantResponse
      leaseID clientv3.LeaseID
      keepResp *clientv3.LeaseKeepAliveResponse
      keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
      ctx context.Context
      cancelFunc context.CancelFunc
      txn clientv3.Txn
      txnResp *clientv3.TxnResponse
   )

   config = clientv3.Config
      Endpoints:[]string"11.32.225.63:2379",  // 集群列表
      DialTimeout:5 * time.Second,
   

   // 建立一个客户端
   if client,err = clientv3.New(config); err != nil
      fmt.Println(err)
      return
   
   defer client.Close()

   //lease 实现锁自动过期
   //op操作
   //txn事务: if else then

   // 申请一个租约lease
   lease = clientv3.NewLease(client)

   // 申请一个5s的租约
   if leaseGrantResp, err = lease.Grant(context.TODO(), 5);err != nil
      fmt.Println(err)
      return
   

   // 拿到租约的ID
   leaseID = leaseGrantResp.ID

   // 准备一个用于取消自动续租的context
   ctx, cancelFunc = context.WithCancel(context.TODO())

   // 确保函数退出后,自动续租会停止
   defer cancelFunc()
   defer lease.Revoke(context.TODO(), leaseID)

   // 自动续约,5s后取消自动续约
   if keepRespChan,err = lease.KeepAlive(ctx, leaseID);err != nil
      fmt.Println(err)
      return
   

   // 处理续约的应答协程
   go func() 
      for
         select 
         case keepResp = <-keepRespChan:
            if keepResp == nil
               fmt.Println("租约已经失效了")
               goto END
            else
               fmt.Println("收到自动续约应答:", keepResp.ID)
            

         
      
   END:
   ()

   // if 不存在key, then设置它,else抢锁失败
   kv = clientv3.NewKV(client)

   // 1 创建事务
   txn = kv.Txn(context.TODO())

   //定义事务

   // 如果key不存在
   txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
      Then(clientv3.OpPut("/cron/lock/job9", "XXX", clientv3.WithLease(leaseID))).
      Else(clientv3.OpGet("/cron/lock/job9")) // 抢锁失败

   // 提交事务
   if txnResp,err = txn.Commit();err!=nil
      fmt.Println(err)
      return
   

   // 判断是否抢到锁
   if !txnResp.Succeeded
      fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
      return
   

   // 2 处理业务
   fmt.Println("处理任务")
   time.Sleep(5 * time.Second)

   // 3 释放锁(取消自动续租,释放租约)
   // defer会把租约释放掉,关联的KV就会被删除

接口函数有很多,不能一一举例了,自己试着调用一下,感觉学起来也还是挺快的。

以上是关于从0实现分布式任务调度系统--etcd基础操作的主要内容,如果未能解决你的问题,请参考以下文章

go任务调度5(go操作etcd)

go任务调度6(etcd租约机制/自动过期)

从0实现分布式任务调度系统--实现cron表达式定时任务

go任务调度9(op实现分布式乐观锁)

go任务调度3(etcd协调服务raft协议)

go任务调度11(分布式crontab架构分析)