分布式锁的实现方式

Posted 三名狂客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式锁的实现方式相关的知识,希望对你有一定的参考价值。

一、redis分布式锁

官方叫做 RedLock 算法,是 redis 官方支持的分布式锁算法。

分布式锁有 3 个重要的考量点:

(1)互斥(只能有一个客户端获取锁)
(2)不能死锁
(3)容错(只要大部分 redis 节点创建了这把锁就可以)

/**
 * 操作redis缓存类
 */
@Slf4j
@Service
public class RedisLockService 
    // 锁过期时间
    private static final long REDIS_LOCK_EXPIRE = 5000L;

    // Redis获取锁单次重试等待时间
    private static final long REDIS_LOCK_WAIT = 100L;

    @Resource
    private RedisClient<?> redisClient;

    /**
     * @param key RedisKey
     * @param ttl 过期时间
     * @return 是否成功获取锁
     */
    public boolean getRedisLock(String key, long ttl) 
        return getRedisLock(key, 1, ttl);
    

    /**
     * @param key        RedisKey
     * @param retryTimes 重试次数
     * @param ttl        过期时间
     * @return 是否成功获取锁
     */
    public boolean getRedisLock(String key, int retryTimes, long ttl) 
        if (StringUtils.isBlank(key)) 
            return false;
        
        for (int i = 0; i < retryTimes; i++) 
            boolean exists = redisClient.setNxDistributedLock(key, key, ttl);
            if (exists) 
                return true;
            
            try 
                TimeUnit.MILLISECONDS.sleep(REDIS_LOCK_WAIT);
             catch (Exception e) 
                log.error("sleep fail", e);
            
        

        return false;
    

    /**
     * @param key        RedisKey
     * @param retryTimes 重试次数
     * @return 是否成功获取锁
     */
    public boolean getRedisLock(String key, int retryTimes) 
        return getRedisLock(key, retryTimes, REDIS_LOCK_EXPIRE);
    

    /**
     * 查询key是否过期
     *
     * @param key RedisKey
     * @return 是否过期
     */
    public boolean isExpire(String key) 
        Long ttl = redisClient.ttl(key);
        if (ttl == null) 
            return true;
        
        return ttl != -1;
    

    /**
     * 删除RedisKey
     *
     * @param key RedisKey
     */
    public void delRedisLock(String key) 
        if (StringUtils.isBlank(key)) 
            return;
        
        redisClient.del(key);
    

    /**
     * 设置key的过期时间为5000ms
     *
     * @param key RedisKey
     */
    public void pExpire(String key) 
        if (StringUtils.isBlank(key)) 
            return;
        
        redisClient.pExpire(key, REDIS_LOCK_EXPIRE);
    

    /**
     * 设置key的过期时间(ms)
     *
     * @param key    RedisKey
     * @param millis 过期时间
     */
    public void pExpire(String key, Long millis) 
        if (StringUtils.isBlank(key) || millis == null || millis <= 0) 
            return;
        
        redisClient.pExpire(key, millis);
    


import lombok.Getter;
import lombok.Setter;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.types.Expiration;

import java.io.Serializable;
import java.util.List;

public class RedisClient<V extends Serializable> 
    @Getter
    @Setter
    private RedisTemplate<Serializable, V> redisTemplate = null;

    public byte[] doSerialize(String str) 
        return redisTemplate.getStringSerializer().serialize(str);
    

    public String doDeSerialize(byte[] bytes) 
        return redisTemplate.getStringSerializer().deserialize(bytes);
    

    /**
     * 根据key设置元素(String)的过期时间
     */
    public Boolean pExpire(final String key, final Long millis) 
        return redisTemplate.execute((RedisCallback<Boolean>) connection -> 
            if (millis > 0) 
                return connection.pExpire(doSerialize(key), millis);
            
            return false;
        );
    

    /**
     * 增加key value,并设置存活时间(秒),当存活时间为0时,会被删除
     * 如果key已经有值,则新值覆盖旧值
     */
    public void set(final String key, final String value) 
        set(key, value, 0L);
    

    public void set(final String key, final String value, final Long liveTime) 
        redisTemplate.execute((RedisCallback<Long>) connection -> 
            connection.set(doSerialize(key), doSerialize(value));
            if (liveTime > 0) 
                connection.expire(doSerialize(key), liveTime);
            
            return 1L;
        );
    

    /**
     * 根据key获取元素(String)
     */
    public String get(final String key) 
        return redisTemplate.execute((RedisCallback<String>) connection -> 
            String getStr = null;
            byte[] value = connection.get(doSerialize(key));
            if (value != null) 
                getStr = doDeSerialize(value);
            
            return getStr;
        );
    

    /**
     * 在指定的 key 不存在时,为 key 设置指定的值
     */
    public Boolean setnx(final String key, final String obj) 
        return redisTemplate.execute(
                (RedisCallback<Boolean>) connection -> connection.setNX(doSerialize(key), doSerialize(obj)));
    

    /**
     * 删除指定key对应的元素,key不存在时会被忽略
     * 返回被删除key的数量
     */
    public Long del(final String key) 
        return redisTemplate.execute((RedisCallback<Long>) connection -> connection.del(doSerialize(key)));
    

    /**
     * 压栈,并设置超时时间,单位 秒
     */
    public void lPush(String key, byte[] bytes, long liveTime) 
        redisTemplate.execute((RedisCallback<Integer>) connection -> 
            connection.lPush(doSerialize(key), bytes);
            connection.expire(doSerialize(key), liveTime);
            return 0;
        );
    

    /**
     * 根据key返回对应的list
     */
    public List<byte[]> lRange(String key) 
        return redisTemplate.execute(
                (RedisCallback<List<byte[]>>) connection -> connection.lRange(doSerialize(key), 0, -1));
    

    /**
     * 向队列尾部添加数据
     */
    public void rPush(String key, String value) 
        redisTemplate.execute((RedisCallback<Integer>) connection -> 
            connection.rPush(doSerialize(key), doSerialize(value));
            return 0;
        );
    

    /**
     * 从队列首部删除并获取一条数据
     */
    public String lPop(String key) 
        return redisTemplate.execute((RedisCallback<String>) connection -> 
            byte[] value = connection.lPop(doSerialize(key));
            if (value != null) 
                return doDeSerialize(value);
            
            return null;
        );
    

    /**
     * 修剪(删除)key所对应的0 ~ length之间的元素
     * example:
     * key : 1
     * key : 2
     * key : 3
     * lTrim("key",2) 即只保留前三个元素,第1~3个元素将被保留,其余被删除,0代表第一个元素
     * length 为-1,标识最后一个元素,-2为倒数第二个元素,以此类推
     */
    public void lTrim(String key, long length) 
        redisTemplate.execute((RedisCallback<Integer>) connection -> 
            connection.lTrim(doSerialize(key), 0, length);
            return 0;
        );
    

    public void lRem(String key, Object value) 
        lRem(key, 0, value);
    

    public void lRem(String key, int count, Object value) 
        redisTemplate.execute((RedisCallback<Integer>) connection -> 
            connection.lRem(doSerialize(key), count, ProtostuffSerialize.serialize(value));
            return 0;
        );
    

    public Object getObjectByProtostuff(final String key, Class<?> cls) 
        return redisTemplate.execute((RedisCallback<Object>) connection -> 
            byte[] value = connection.get(doSerialize(key));
            if (value != null) 
                return ProtostuffSerialize.deserialize(value, cls);
            
            return null;
        );
    

    public Boolean setObjectByProtostuff(final String key, final Object obj, final Long liveTime) 
        return redisTemplate.execute((RedisCallback<Boolean>) connection -> 
            Boolean flag = connection.set(doSerialize(key), ProtostuffSerialize.serialize(obj));
            if (liveTime > 0) 
                connection.expire(doSerialize(key), liveTime);
            
            return flag;
        );
    

    public Boolean setObjectByProtostuffExpireAt(final String key, final Object obj, final Long expireAt) 
        return redisTemplate.execute((RedisCallback<Boolean>) connection -> 
            Boolean flag = connection.set(doSerialize(key), ProtostuffSerialize.serialize(obj));
            connection.expireAt(doSerialize(key), expireAt);
            return flag;
        );
    

    public Boolean setNx(final String key, final Object object, final Long liveTime) 
        return redisTemplate.execute((RedisCallback<Boolean>) connection -> 
            boolean flag = connection.setNX(doSerialize(key), ProtostuffSerialize.serialize(object));
            if (flag && liveTime > 0) 
                connection.expire(doSerialize(key), liveTime);
            
            return flag;
        );
    

    public Long ttl(final String key) 
        return redisTemplate.execute((RedisCallback<Long>) connection -> connection.ttl(doSerialize(key)));
    

    /**
     * 分布式锁专用获取锁
     *
     * @param key      锁关键字
     * @param uniqueId 锁唯一标识
     * @param liveTime 过期时间
     * @return
     */
    public Boolean setNxDistributedLock(final String key, final String uniqueId, final Long liveTime) 
        return redisTemplate.execute(
                (RedisCallback<Boolean>) connection -> connection.set(doSerialize(key), doSerialize(uniqueId),
                        Expiration.milliseconds(liveTime), RedisStringCommands.SetOption.ifAbsent()));
    

    /**
     * Set集合添加元素
     *
     * @param key
     * @param value
     * @return
     */
    public Long sAdd(final String key, final String value) 
        return redisTemplate.execute((RedisCallback<Long>) connection -> 
            return connection.sAdd(doSerialize(key), doSerialize(value));
        );
    

    /**
     * Set集合添加多个元素
     *
     * @param key
     * @param values
     * @return
     */
    public Long sAdds(final String key, final String[] values) 
        return redisTemplate.execute((RedisCallback<Long>) connection -> 
            long success = 0;
            for (int i = 0; i < values.length; i++) 
                success += connection.sAdd(doSerialize(key), doSerialize(values[i]));
            
            return success;
        );
    

    /**
     * Set集合取出1个元素并删除
     *
     * @param key
     * @return
     */
    public String sPop(final String key) 
        return redisTemplate.execute((RedisCallback<String>) connection -> 
            byte[] value = connection.sPop(doSerialize(key));
            if (value != null) 
                return doDeSerialize(value);
            
            return null;
        );
    

    /**
     * Set集合取出count个元素并删除
     *
     * @param key
     * @param count
     * @return
     */
    public List<byte[]> sPops(final String key, final int count) 
        return redisTemplate.execute((RedisCallback<List<byte[]>>) connection -> 
            return connection.sPop(doSerialize(key), count);
        );
    

二、zk分布式锁

zk 分布式锁,其实可以做的比较简单,就是某个节点尝试创建临时 znode,此时创建成功了就获取了这个锁;这个时候别的客户端来创建锁会失败,只能注册个监听器监听这个锁。释放锁就是删除这个 znode,一旦释放掉就会通知客户端,然后有一个等待着的客户端就可以再次重新加锁。如果有一把锁,被多个人给竞争,此时多个人会排队,第一个拿到锁的人会执行,然后释放锁;后面的每个人都会去监听排在自己前面的那个人创建的 node 上,一旦某个人释放了锁,排在自己后面的人就会被 zookeeper 给通知,一旦被通知了之后,就 ok 了,自己就获取到了锁,就可以执行代码了。

public class ZooKeeperDistributedLock implements Watcher 

    private ZooKeeper zk;
    private String locksRoot = "/locks";
    private String productId;
    private String waitNode;
    private String lockNode;
    private CountDownLatch latch;
    private CountDownLatch connectedLatch = new CountDownLatch(1);
    private int sessionTimeout = 30000;

    public ZooKeeperDistributedLock(String productId) 
        this.productId = productId;
        try 
            String address = "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181";
            zk = new ZooKeeper(address, sessionTimeout, this);
            connectedLatch.await();
         catch (IOException e) 
            throw new LockException(e);
         catch (KeeperException e) 
            throw new LockException(e);
         catch (InterruptedException e) 
            throw new LockException(e);
        
    

    public void process(WatchedEvent event) 
        if (event.getState() == KeeperState.SyncConnected) 
            connectedLatch.countDown();
            return;
        

        if (this.latch != null) 
            this.latch.countDown();
        
    

    public void acquireDistributedLock() 
        try 
            if (this.tryLock()) 
                return;
             else 
                waitForLock(waitNode, sessionTimeout);
            
         catch (KeeperException e) 
            throw new LockException(e);
         catch (InterruptedException e) 
            throw new LockException(e);
        
    

    public boolean tryLock() 
        try 
 		    // 传入进去的locksRoot + “/” + productId
		    // 假设productId代表了一个商品id,比如说1
		    // locksRoot = locks
		    // /locks/10000000000,/locks/10000000001,/locks/10000000002
            lockNode = zk.create(locksRoot + "/" + productId, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
   
            // 看看刚创建的节点是不是最小的节点
	 	    // locks:10000000000,10000000001,10000000002
            List<String> locks = zk.getChildren(locksRoot, false);
            Collections.sort(locks);
	
            if(lockNode.equals(locksRoot+"/"+ locks.get(0)))
                //如果是最小的节点,则表示取得锁
                return true;
            
	
            //如果不是最小的节点,找到比自己小1的节点
	  int previousLockIndex = -1;
            for(int i = 0; i < locks.size(); i++) 
		if(lockNode.equals(locksRoot + “/” + locks.get(i))) 
	         	    previousLockIndex = i - 1;
		    break;
		
	   
	   
	   this.waitNode = locks.get(previousLockIndex);
         catch (KeeperException e) 
            throw new LockException(e);
         catch (InterruptedException e) 
            throw new LockException(e);
        
        return false;
    

    private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException 
        Stat stat = zk.exists(locksRoot + "/" + waitNode, true);
        if (stat != null) 
            this.latch = new CountDownLatch(1);
            this.latch.await(waitTime, TimeUnit.MILLISECONDS);
            this.latch = null;
        
        return true;
    

    public void unlock() 
        try 
            // 删除/locks/10000000000节点
            // 删除/locks/10000000001节点
            System.out.println("unlock " + lockNode);
            zk.delete(lockNode, -1);
            lockNode = null;
            zk.close();
         catch (InterruptedException e) 
            e.printStackTrace();
         catch (KeeperException e) 
            e.printStackTrace();
        
    

    public class LockException extends RuntimeException 
        private static final long serialVersionUID = 1L;

        public LockException(String e) 
            super(e);
        

        public LockException(Exception e) 
            super(e);
        
    

三、二者比较(redis 分布式锁和 zk 分布式锁的对比)

redis 分布式锁,其实需要自己不断去尝试获取锁,比较消耗性能。
zk 分布式锁,获取不到锁,注册个监听器即可,不需要不断主动尝试获取锁,性能开销较小。

另外一点就是,如果是 redis 获取锁的那个客户端 出现 bug 挂了,那么只能等待超时时间之后才能释放锁;而 zk 的话,因为创建的是临时 znode,只要客户端挂了,znode 就没了,此时就自动释放锁。

redis 分布式锁大家没发现好麻烦吗?遍历上锁,计算时间等等zk 的分布式锁语义清晰实现简单。

所以先不分析太多的东西,就说这两点,我个人实践认为 zk 的分布式锁比 redis 的分布式锁牢靠、而且模型简单易用。

以上是关于分布式锁的实现方式的主要内容,如果未能解决你的问题,请参考以下文章

分布式锁的实现方式

Redis 分布式锁的正确实现方式

Redis分布式锁的正确实现方式

Redis分布式锁的正确实现方式

Redis分布式锁的正确实现方式

Redis分布式锁的正确实现方式