go版本分布式锁redsync使用教程

Posted 神技圈子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了go版本分布式锁redsync使用教程相关的知识,希望对你有一定的参考价值。

redsync使用教程

前言

在编程语言中锁可以理解为一个变量,该变量在同一时刻只能有一个线程拥有,以便保护共享数据在同一时刻只有一个线程去操作。对于高可用的分布式锁应该满足以下条件:
1.互斥:在任意时间内,只有一个客户能够获得一把锁,具有排他性。
2.避免死锁:即使客户端宕机或者从集群中分离了,其它客户端仍然可以获取到该锁
3.容错:只要大部分Redis节点存活,客户端就能正确地获取锁和释放锁。即使锁住某个资源的客户端释放锁之前崩溃或者网络分区仍然能够获取锁和释放锁。
对于Redis高可用集群而言,上述三个条件都非常容易满足,所以适合做分布式锁。

redsync结构

redsync 的通用结构定义如下:

  • Pool:抽象连接池
  • Conn:抽象每个 Redis 连接
  • Script:Redis 脚本

Pool结构

redsync结构的Pools是一个redis.pool数组,每个 redis.Pool 都是上面的 Pool 实现,它代表了一个 Redis 实例的连接池:

Mutex结构

Mutex代表了一个分布式锁,其成员多为 redlock 算法所需要的条件:

// A Mutex is a distributed mutual exclusion lock.
type Mutex struct 
	name   string  				// 名称
	expiry time.Duration    	// 锁的有效时间
	tries     int   // 尝试次数
	delayFunc DelayFunc // 失败尝试设置延迟

	factor float64 // 误差系数控制

	quorum int // 投票数 一般为节点数 / 2+1,节点数为奇数

	genValueFunc func() (string, error)  // 加密函数,生成唯一随机串
	value        string  // 默认就是唯一随机串
	until        time.Time // 过期时间

	pools []Pool // 连接池(每个 Pool 指一个 Redis 实例)

获取锁的Lock方法实现了redLock的加锁接口,具体实现如下

func (m *Mutex) LockContext(ctx context.Context) error 
	if ctx == nil 
		ctx = context.Background()
	

   //生成随机串,base64
	value, err := m.genValueFunc()
	if err != nil 
		return err
	

   //不超过tries次数进行加锁
	for i := 0; i < m.tries; i++ 
		if i != 0 
			time.Sleep(m.delayFunc(i))
		

		start := time.Now()

		n, err := func() (int, error) 
			ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
			defer cancel()
			//尝试异步去获取锁
			return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) 
				return m.acquire(ctx, pool, value)
			)
		()

		now := time.Now()
		// 过期时间 = 有效时间值 - 获取锁消耗的时间值 - 有效时间值 * 误差系数
		until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
	  //成功节点数>=节点数/2+1&& 未过期时,判定加锁成功	
      if n >= m.quorum && now.Before(until) 
			m.value = value
			m.until = until
			return nil
		
		func() (int, error) 
			ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
			defer cancel()
			//获取锁失败,尝试异步去释放锁
			return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) 
				return m.release(ctx, pool, value)
			)
		()
		if i == m.tries-1 && err != nil 
			return err
		
	

	return ErrFailed

time.Sleep(m.delayFunc(i))的失败重试逻辑是当客户端无法获取锁时会设置一个随机值来重试。这个随机值应当和申请锁时间错开,减少脑裂的可能性。此外,还调用了actOnPoolsAsync来实现非阻塞方式同时向多个Redis实例发送set请求。我们来看下actOnPoolsAsync是如何定义的。

func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) 
	type result struct 
		Node   int
		Status bool
		Err    error
	

	ch := make(chan result)
	for node, pool := range m.pools 
		go func(node int, pool redis.Pool) 
			r := resultNode: node
			r.Status, r.Err = actFn(pool)
			ch <- r
		(node, pool)
	
	n := 0
	var taken []int
	var err error
	for range m.pools 
		r := <-ch
		if r.Status 
			n++
		 else if r.Err != nil 
			err = multierror.Append(err, &RedisErrorNode: r.Node, Err: r.Err)
		 else 
			taken = append(taken, r.Node)
			err = multierror.Append(err, &ErrNodeTakenNode: r.Node)
		
	

	if len(taken) >= m.quorum 
		return n, &ErrTakenNodes: taken
	
	return n, err


acquire加锁操作

func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) 
	conn, err := pool.Get(ctx)
	if err != nil 
		return false, err
	
	defer conn.Close()
	reply, err := conn.SetNX(m.name, value, m.expiry)
	if err != nil 
		return false, err
	
	return reply, nil

release解锁操作

func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) 
	conn, err := pool.Get(ctx)
	if err != nil 
		return false, err
	
	defer conn.Close()
	//调用Eval,以脚本方式释放锁
	status, err := conn.Eval(deleteScript, m.name, value)
	if err != nil 
		return false, err
	
	return status != int64(0), nil

redsync包的使用

该包的使用很简单,具体步骤如下:

  • 首先,创建一个Redis的客户端连接;
  • 将该客户端连接加入到Redis的Pool中;
  • redsync基于该Redis Pool进行实例化;
  • 通过redsync实例的NewMutex就可以基于一个具体的key新建一个分布式锁,
    该包进行实例化时有基于Redis的单机模式和集群模式两种使用方式,在使用上主要有两种区别:
  • Redis的客户端是以集群模式还是单机模式创建;
  • 在导入redsync包时,集群模式需要导入goredis/v8的版本
    具体例子如下:
func main() 
	//创建redis的客户端连接
	cli := goredislib.NewClient(&goredislib.Options
		Addr: "localhost:6379",
	)

	pool := goredis.NewPool(cli)
	rs := redsync.New(pool)

	mutexname := "test-global-mutex"

	mutex := rs.NewMutex(mutexname)

	if err := mutex.Lock(); err != nil 
		panic(err)
	

	if ok, err := mutex.Unlock(); !ok || err == nil 
		panic("unlock failed")
	


以上是关于go版本分布式锁redsync使用教程的主要内容,如果未能解决你的问题,请参考以下文章

Go + Redis 实现分布式锁

用 Go + Redis 实现分布式锁

Redis 中使用 Lua 脚本

redis教程-----redis事务记录日志到redis分布式锁

Redis分布式锁不生效

[视频]redis分布式锁解决并发安全问题(抢票秒杀)