分布式锁的实现方式
Posted 后端开发者中心
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式锁的实现方式相关的知识,希望对你有一定的参考价值。
分布式锁的实现方式
分布式锁常见的三种实现方式
基于redis的分布式锁
基于zookeeper的分布式锁
数据库乐观锁
可靠性
为了确保分布式锁可用,我们要确保锁的实现满足以下四个条件:
互斥性。即在任意时刻,只有一个客户端能持有锁。
不会发生死锁。即使有一个客户端在持有锁期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
解铃还需系铃人。加锁和解锁必须是同一个客户端。
具有容错性[集群环境]。只要大部分的redis节点正常运行,客户端就可以加锁和解锁。
以下介绍比较常用的前两种实现方式,基于缓存redis以及zookeeper实现分布式锁。
1.基于缓存redis实现
组件依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
加锁代码
public class RedisTool {
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
/**
* 尝试获取分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
}
加锁就一行代码:jedis.set(String key, String value, String nxxx, String expx, int time),这个set()方法一共有五个形参:
第一个为key,我们使用key来当锁,因为key是唯一的。
第二个为value,我们传的是requestId,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randonUUID().toString()方法生成。
第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,进行set操作;若key已经存在,则不做任何操作。
第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。
第五个为time,与第四个参数呼应,代表key的过期时间。
执行上面的set()方法就只会导致两种结果:1.当前没有锁(key不存在),就进行加锁操作,并对锁设置有效期,同时value表示加锁的客户端。2.已有锁存在,不做任何操作。
我们加锁的代码满足可靠性里描述的三个条件:
set()加入了NX参数,保证如果已有key存在,则方法不会调用成功,即同一时间只有一个客户端能持有锁,满足互斥性。
由于对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁,不会发生死锁。
将value赋值为requestId,代表加锁的客户端请求标识,那么在客户端解锁的时候就可以进行校验是否是同一个客户端。
本方法只考虑redis单机部署的场景,所以容错性暂不考虑。可以尝试使用Redisson实现。
解锁代码
public class RedisTool {
private static final Long RELEASE_SUCCESS = 1L;
/**
* 释放分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功
*/
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
}
第一行代码写了一个简单的Lua脚本代码。第二行将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKey,ARGV[1]赋值为requestId。eval()方法是将Lua代码交给redis服务端执行。 这段Lua代码的功能是,首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。使用Lua语言实现可以保证上述操作是原子性的。执行eval()方法可以确保原子性。简单来说,就是在eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,redis才会执行其他命令。
2.基于zookeeper实现
这里利用zookeeper的EPHEMERAL_SEQUENTIAL(临时顺序)类型节点及watcher机制,来简单实现分布式锁。
主要思想:
开启10个线程,在disLocks节点下各自创建名为sub的EPHEMERAL_SEQUENTIAL节点;
获取disLocks节点下所有子节点,排序,如果自己的节点编号最小,则获取锁;
否则watch排在自己前面的节点,监听到其删除后,进入第2步(重新检测排序是防止监听的节点发生连接失效,导致的节点删除情况);
删除自身sub节点,释放连接。
tip:zookeeper的4种类型节点
PERSISTENT -- 持久节点,节点创建后会一直存在,不会因为客户端会话失效而删除;
PERSISTENT_SEQUENTIAL -- 持久顺序节点,zk会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名;
EPHEMERAL -- 临时节点,客户端会话失效或连接关闭后,该节点会被自动删除,且不能在临时节点下创建子节点;
EPHEMERAL_SEQUENTIAL -- 临时顺序节点,特性同上。
代码实现
public class DistributedLock implements Watcher {
private int threadId;
private ZooKeeper zk = null;
private String selfPath;
private String waitPath;
private String LOG_PREFIX_OF_THREAD;
private static final int SESSION_TIMEOUT = 10000;
private static final String GROUP_PATH = "/disLocks";
private static final String SUB_PATH = "/disLocks/sub";
private static final String CONNECTION_STRING = "192.168.136.86:2181";
private static final int THREAD_NUM = 10;
//确保连接zk成功;
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
//确保所有线程运行结束;
private static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);
private static final Logger LOG = LoggerFactory.getLogger(DistributedLock.class);
public DistributedLock(int id) {
this.threadId = id;
LOG_PREFIX_OF_THREAD = "【第"+threadId+"个线程】";
}
public static void main(String[] args) {
for(int i=0; i < THREAD_NUM; i++){
final int threadId = i+1;
new Thread(){
@Override
public void run() {
try{
DistributedLock dc = new DistributedLock(threadId);
dc.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);
//GROUP_PATH不存在的话,由一个线程创建即可;
//可不加锁,zk创建节点是阻塞的,无并发问题
//synchronized (threadSemaphore){
dc.createPath(GROUP_PATH, "该节点由线程" + threadId + "创建", true);
//}
dc.getLock();
} catch (Exception e){
LOG.error("【第"+threadId+"个线程】 抛出的异常:");
e.printStackTrace();
}
}
}.start();
}
try {
threadSemaphore.await();
LOG.info("所有线程运行结束!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 获取锁
* @return
*/
private void getLock() throws KeeperException, InterruptedException {
selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
LOG.info(LOG_PREFIX_OF_THREAD+"创建锁路径:"+selfPath);
if(checkMinPath()){
getLockSuccess();
}
}
/**
* 创建节点
* @param path 节点path
* @param data 初始数据内容
* @return
*/
public boolean createPath( String path, String data, boolean needWatch) throws KeeperException, InterruptedException {
if(zk.exists(path, needWatch)==null){
LOG.info( LOG_PREFIX_OF_THREAD + "节点创建成功, Path: "
+ this.zk.create( path,
data.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT )
+ ", content: " + data );
}
return true;
}
/**
* 创建ZK连接
* @param sessionTimeout Session超时时间
*/
public void createConnection( String connectString, int sessionTimeout ) throws IOException, InterruptedException {
zk = new ZooKeeper( connectString, sessionTimeout, this);
connectedSemaphore.await();
}
/**
* 获取锁成功
*/
public void getLockSuccess() throws KeeperException, InterruptedException {
if(zk.exists(this.selfPath,false) == null){
LOG.error(LOG_PREFIX_OF_THREAD+"本节点已不在了...");
return;
}
LOG.info(LOG_PREFIX_OF_THREAD + "获取锁成功,赶紧干活!");
Thread.sleep(2000);
LOG.info(LOG_PREFIX_OF_THREAD + "删除本节点:"+selfPath);
zk.delete(this.selfPath, -1);
releaseConnection();
threadSemaphore.countDown();
}
/**
* 关闭ZK连接
*/
public void releaseConnection() {
if ( this.zk !=null ) {
try {
this.zk.close();
} catch ( InterruptedException e ) {}
}
LOG.info(LOG_PREFIX_OF_THREAD + "释放连接");
}
/**
* 检查自己是不是最小的节点
* @return
*/
public boolean checkMinPath() throws KeeperException, InterruptedException {
List<String> subNodes = zk.getChildren(GROUP_PATH, false);
Collections.sort(subNodes);
int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1));
switch (index){
case -1:{
LOG.error(LOG_PREFIX_OF_THREAD+"本节点已不在了..."+selfPath);
return false;
}
case 0:{
LOG.info(LOG_PREFIX_OF_THREAD+"子节点中,我果然是老大"+selfPath);
return true;
}
default:{
this.waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1);
LOG.info(LOG_PREFIX_OF_THREAD+"获取子节点中,排在我前面的"+waitPath);
try{
zk.getData(waitPath, true, new Stat());
return false;
}catch(KeeperException e){
if(zk.exists(waitPath,false) == null){
LOG.info(LOG_PREFIX_OF_THREAD+"子节点中,排在我前面的"+waitPath+"已失踪,幸福来得太突然?");
return checkMinPath();
}else{
throw e;
}
}
}
}
}
public void process(WatchedEvent event) {
if(event == null){
return;
}
Event.KeeperState keeperState = event.getState();
Event.EventType eventType = event.getType();
if ( Event.KeeperState.SyncConnected == keeperState) {
if ( Event.EventType.None == eventType ) {
LOG.info( LOG_PREFIX_OF_THREAD + "成功连接上ZK服务器" );
connectedSemaphore.countDown();
}else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
LOG.info(LOG_PREFIX_OF_THREAD + "收到情报,排我前面的家伙已挂,我是不是可以出山了?");
try {
if(checkMinPath()){
getLockSuccess();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}else if ( Event.KeeperState.Disconnected == keeperState ) {
LOG.info( LOG_PREFIX_OF_THREAD + "与ZK服务器断开连接" );
}/* else if ( Event.KeeperState.AuthFailed == keeperState ) {
LOG.info( LOG_PREFIX_OF_THREAD + "权限检查失败" );
}*/ else if ( Event.KeeperState.Expired == keeperState ) {
LOG.info( LOG_PREFIX_OF_THREAD + "会话失效" );
}
}
}
以上是关于分布式锁的实现方式的主要内容,如果未能解决你的问题,请参考以下文章