使用redis实现分布式锁

Posted 没有梦想-何必远方

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用redis实现分布式锁相关的知识,希望对你有一定的参考价值。

在高并发、分布式部署的场景中,为保证数据的一致性,常常会用到锁。举个例子,在修改金额这种比较敏感的数据,同一时间只能允许一人操作,如果有两人同时进行一条数据的修改,一个增加金额,一个减少金额,这时就很可能出现数据错误了。如果是单机操作,我们可以使用Synchronized同步实现锁,但对于集群分布式操作,Synchronized就鸡肋了,需要寻找别的方式来解决。

补充一下高并发下出现数据问题的原因:

数据库的隔离级别,默认是可重复度,基于mvvc多版本控制实现,不熟悉的可以先百度下 ~ 比如现在有个场景
,我们先查询一条记录,然后对这条记录的金额字段进行修改。高并发下两个事务同时进行这个操作,查询到同样的记录,金额都是6000,同时对这个金额进行减3000的操作,这时因为可重复度的隔离级别,两个事务是相互不可见的,所以同时对6000进行减3000的操作,最终这个记录的结果是3000,而我们实际需要的应该是0,因此出现数据错误。

分布式锁的实现基本有三种方式:基于数据库、基于缓存,或基于zookeeper。今天我们来看一下基于redis缓存来实现分布式锁的方式。

本文参考文章分布式锁1 Java常用技术方案

一、使用setnx()、expire()方法

首先说明一下setnx()命令,setnx的含义就是SET if Not Exists,其主要有两个参数 setnx(key, value)。该方法是原子的,如果key不存在,则设置当前key成功,返回1;如果当前key已经存在,则设置当前key失败,返回0。但是要注意的是setnx命令不能设置key的超时时间,只能通过expire()来对key设置。
具体的使用步骤如下:
1. setnx(lockkey, 1) 如果返回0,则说明占位失败;如果返回1,则说明占位成功。
2. expire()命令对lockkey设置超时时间,为的是避免死锁问题。
3. 执行完业务代码后,可以通过delete命令删除key。
这个方案其实是可以解决日常工作中的需求的,但从技术方案的探讨上来说,可能还有一些可以完善的地方。比如,如果在第一步setnx执行成功后,在expire()命令执行成功前,发生了宕机(比如意外断电)的现象,那么就依然会出现死锁的问题,所以如果要对其进行完善的话,可以使用redis的setnx()、get()和getset()方法来实现分布式锁。

二、使用setnx()、get()、getset()方法

首先说明一下getset()命令,该方法有两个参数,getset(key,newValue),给key设置一个新值newValue,并返回旧值oldValue,所以两个线程在锁超时争夺锁的时候,就可以比较getset后的新值是否和原来的oldValue相等来取得锁。多次操作会出现以下情况:
1. getset(key, “value1”) 返回nil 此时key的值会被设置为value1
2. getset(key, “value2”) 返回value1 此时key的值会被设置为value2
3. getset(key, “value3”) 返回value2 此时key的值会被设置为value3
……
这样执行第一条的线程会取得锁。
具体的使用步骤如下:
1. setnx(lockkey, 当前时间+过期超时时间) ,如果返回1,则获取锁成功;如果返回0则没有获取到锁,转向2。
2. get(lockkey)获取值oldExpireTime ,并将这个value值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向3。
3. 计算newExpireTime=当前时间+过期超时时间,然后getset(lockkey, newExpireTime) 会返回当前lockkey的值currentExpireTime。
4. 判断currentExpireTime与oldExpireTime 是否相等,如果相等,说明当前getset设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。
5. 在获取到锁之后,当前线程可以开始自己的业务处理,执行完(不管成功失败)都释放锁。

画了一个简单的流程图,便于理解 :

相对来说,第二种方式更为严谨,即便出现执行expire方法时宕机的情况也不会有死锁发生,但实际上出现这个情况的几率是极小的,个人认为可以忽略不计。我在解决这个问题的时候用的是第二种方式,代码贴一下:

在BaseService类里面写了两个protected方法,一个获取锁,一个释放锁,需要用的方法继承下这个类,然后在业务方法开始和结束时调用下方法即可:

protected Boolean getLock(String methodName,String key)
        String lockkey = methodName + key;
        Long time = System.currentTimeMillis() + expireTime;
        String ThreadName = Thread.currentThread().getName();
        Long exist = redisObjectManager.setnx(lockkey,String.valueOf(time));
        if(exist == 1)
            LOGGER.info("========>" +ThreadName + "取到锁,执行业务~");
            return true;
        else
            Long currentTime = System.currentTimeMillis();
            String thisKey = redisObjectManager.get(lockkey);
            if(null == thisKey || "".equals(thisKey))
                LOGGER.info("========>" +ThreadName + "未取到锁,查询key失败");
                return false;
            
            Long oldTime = Long.valueOf(thisKey);
            if(currentTime > oldTime)
                //已超时,确认锁是否还在
                currentTime = Long.valueOf(redisObjectManager.getSet(lockkey,currentTime+""));
                if(currentTime == oldTime)
                    LOGGER.info("========>" +ThreadName + "取到锁,执行业务~");
                    return true;
                
            
        
        LOGGER.info("========>" +ThreadName + "未取到锁,等待~");
        return false;
    

    protected void releaseLock(String methodName,String key) 
        String lockkey = methodName + key;
        String ThreadName = Thread.currentThread().getName();
        LOGGER.info("========>" +ThreadName + "释放锁~");
        redisObjectManager.delKey(lockkey);
    

(加入日志是为了测试时更好的看到进行获取、争夺锁的情况)

调用:

 public RemoteResult<Integer> saveCreditBills(Long orderId,String contractURL) 
        String methodName = "saveCreditBills";
        Boolean ifHaslock = false;
        String key = "";
        RemoteResult<Integer> result = new RemoteResult<Integer>();
        try 

            key = orderId + " | freezeCredit";
            Boolean lock = getLock(methodName,key);
            for(int i = 0;i <5 ;i++)
                if(lock)
                    ifHaslock = true;
                    //成功获取锁,开始逻辑处理
                    break;
                else
                    //获取锁失败,短暂睡眠后重试
                    Thread.sleep(500);
                
            

            return result;
         catch(BusinessException e) 
            return businessException(result,e,methodName);
        catch(Exception e) 
            return encounterException(result,e,methodName);
        finally
            if(ifHaslock)//释放锁
                releaseLock(methodName,key);
            
        
    

现在可以用多线程进行测试,你会看到类似如下的日志信息:

重点内容通过数据对比,发现数据是正常,而不用锁控制的话,多线程操作会发生数据错误。至此,redis实现分布式锁大功告成! ^ ^

以上是关于使用redis实现分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

基于redis实现分布式锁

基于redis实现分布式锁

Redis 分布式锁的正确实现方式

Redis实现分布式锁

[Redis] 基于redis的分布式锁

基于redis实现分布式锁