使用redis实现分布式锁
Posted 没有梦想-何必远方
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用redis实现分布式锁相关的知识,希望对你有一定的参考价值。
在高并发、分布式部署的场景中,为保证数据的一致性,常常会用到锁。举个例子,在修改金额这种比较敏感的数据,同一时间只能允许一人操作,如果有两人同时进行一条数据的修改,一个增加金额,一个减少金额,这时就很可能出现数据错误了。如果是单机操作,我们可以使用Synchronized同步实现锁,但对于集群分布式操作,Synchronized就鸡肋了,需要寻找别的方式来解决。
补充一下高并发下出现数据问题的原因:
数据库的隔离级别,默认是可重复度,基于mvvc多版本控制实现,不熟悉的可以先百度下 ~ 比如现在有个场景
,我们先查询一条记录,然后对这条记录的金额字段进行修改。高并发下两个事务同时进行这个操作,查询到同样的记录,金额都是6000,同时对这个金额进行减3000的操作,这时因为可重复度的隔离级别,两个事务是相互不可见的,所以同时对6000进行减3000的操作,最终这个记录的结果是3000,而我们实际需要的应该是0,因此出现数据错误。
分布式锁的实现基本有三种方式:基于数据库、基于缓存,或基于zookeeper。今天我们来看一下基于redis缓存来实现分布式锁的方式。
本文参考文章分布式锁1 Java常用技术方案
一、使用setnx()、expire()方法
首先说明一下setnx()命令,setnx的含义就是SET if Not Exists,其主要有两个参数 setnx(key, value)。该方法是原子的,如果key不存在,则设置当前key成功,返回1;如果当前key已经存在,则设置当前key失败,返回0。但是要注意的是setnx命令不能设置key的超时时间,只能通过expire()来对key设置。
具体的使用步骤如下:
1. setnx(lockkey, 1) 如果返回0,则说明占位失败;如果返回1,则说明占位成功。
2. expire()命令对lockkey设置超时时间,为的是避免死锁问题。
3. 执行完业务代码后,可以通过delete命令删除key。
这个方案其实是可以解决日常工作中的需求的,但从技术方案的探讨上来说,可能还有一些可以完善的地方。比如,如果在第一步setnx执行成功后,在expire()命令执行成功前,发生了宕机(比如意外断电)的现象,那么就依然会出现死锁的问题,所以如果要对其进行完善的话,可以使用redis的setnx()、get()和getset()方法来实现分布式锁。
二、使用setnx()、get()、getset()方法
首先说明一下getset()命令,该方法有两个参数,getset(key,newValue),给key设置一个新值newValue,并返回旧值oldValue,所以两个线程在锁超时争夺锁的时候,就可以比较getset后的新值是否和原来的oldValue相等来取得锁。多次操作会出现以下情况:
1. getset(key, “value1”) 返回nil 此时key的值会被设置为value1
2. getset(key, “value2”) 返回value1 此时key的值会被设置为value2
3. getset(key, “value3”) 返回value2 此时key的值会被设置为value3
……
这样执行第一条的线程会取得锁。
具体的使用步骤如下:
1. setnx(lockkey, 当前时间+过期超时时间) ,如果返回1,则获取锁成功;如果返回0则没有获取到锁,转向2。
2. get(lockkey)获取值oldExpireTime ,并将这个value值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向3。
3. 计算newExpireTime=当前时间+过期超时时间,然后getset(lockkey, newExpireTime) 会返回当前lockkey的值currentExpireTime。
4. 判断currentExpireTime与oldExpireTime 是否相等,如果相等,说明当前getset设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。
5. 在获取到锁之后,当前线程可以开始自己的业务处理,执行完(不管成功失败)都释放锁。
画了一个简单的流程图,便于理解 :
相对来说,第二种方式更为严谨,即便出现执行expire方法时宕机的情况也不会有死锁发生,但实际上出现这个情况的几率是极小的,个人认为可以忽略不计。我在解决这个问题的时候用的是第二种方式,代码贴一下:
在BaseService类里面写了两个protected方法,一个获取锁,一个释放锁,需要用的方法继承下这个类,然后在业务方法开始和结束时调用下方法即可:
protected Boolean getLock(String methodName,String key)
String lockkey = methodName + key;
Long time = System.currentTimeMillis() + expireTime;
String ThreadName = Thread.currentThread().getName();
Long exist = redisObjectManager.setnx(lockkey,String.valueOf(time));
if(exist == 1)
LOGGER.info("========>" +ThreadName + "取到锁,执行业务~");
return true;
else
Long currentTime = System.currentTimeMillis();
String thisKey = redisObjectManager.get(lockkey);
if(null == thisKey || "".equals(thisKey))
LOGGER.info("========>" +ThreadName + "未取到锁,查询key失败");
return false;
Long oldTime = Long.valueOf(thisKey);
if(currentTime > oldTime)
//已超时,确认锁是否还在
currentTime = Long.valueOf(redisObjectManager.getSet(lockkey,currentTime+""));
if(currentTime == oldTime)
LOGGER.info("========>" +ThreadName + "取到锁,执行业务~");
return true;
LOGGER.info("========>" +ThreadName + "未取到锁,等待~");
return false;
protected void releaseLock(String methodName,String key)
String lockkey = methodName + key;
String ThreadName = Thread.currentThread().getName();
LOGGER.info("========>" +ThreadName + "释放锁~");
redisObjectManager.delKey(lockkey);
(加入日志是为了测试时更好的看到进行获取、争夺锁的情况)
调用:
public RemoteResult<Integer> saveCreditBills(Long orderId,String contractURL)
String methodName = "saveCreditBills";
Boolean ifHaslock = false;
String key = "";
RemoteResult<Integer> result = new RemoteResult<Integer>();
try
key = orderId + " | freezeCredit";
Boolean lock = getLock(methodName,key);
for(int i = 0;i <5 ;i++)
if(lock)
ifHaslock = true;
//成功获取锁,开始逻辑处理
break;
else
//获取锁失败,短暂睡眠后重试
Thread.sleep(500);
return result;
catch(BusinessException e)
return businessException(result,e,methodName);
catch(Exception e)
return encounterException(result,e,methodName);
finally
if(ifHaslock)//释放锁
releaseLock(methodName,key);
现在可以用多线程进行测试,你会看到类似如下的日志信息:
重点内容通过数据对比,发现数据是正常,而不用锁控制的话,多线程操作会发生数据错误。至此,redis实现分布式锁大功告成! ^ ^
以上是关于使用redis实现分布式锁的主要内容,如果未能解决你的问题,请参考以下文章