Spring Boot基于zookeeper原生方式实现分布式锁
Posted 嘉禾嘉宁papa
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Boot基于zookeeper原生方式实现分布式锁相关的知识,希望对你有一定的参考价值。
目录
一、背景
我在之前的文章SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理详细介绍了它的使用及其原理,现在我们也根据这个思路,用zookeeper原生的方式来实现一个分布式锁,加深对分布式锁的理解。本文中Spring Boot的版本是2.5.2,zookeeper的版本是3.6.3。
我们大致的大致的流程图如下图,可作为我们查看代码的一个思路,不然看的头大。(当然本图是没有包含可重入锁的流程判断在里面的)
二、maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.6</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.alian</groupId>
<artifactId>zklock</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>zklock</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<!--主要用于Maps.newConcurrentMap()-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.14</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
三、配置
3.1、application.yml配置
application.yml
server:
port: 8082
servlet:
context-path: /zklock
app:
zookeeper:
server: 10.130.3.16:2181
session-timeout: 15000
#这里配置的路径没有用"/"结尾
root-lock-path: /root/alian
3.2、属性配置类
此配置类不懂的可以参考我另一篇文章:Spring Boot读取配置文件常用方式
AppProperties.java
package com.alian.zklock.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "app.zookeeper")
public class AppProperties {
/**
* zookeeper服务地址
*/
private String server;
/**
* session超时时间
*/
private int sessionTimeout;
/**
* 分布式锁路径
*/
private String rootLockPath;
}
3.3、ZookeeperConfig配置件
ZookeeperConfig.java
package com.alian.zklock.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Configuration
public class ZookeeperConfig {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
@Autowired
private AppProperties appProperties;
@Bean
public ZooKeeper zookeeper() throws Exception {
ZooKeeper zookeeper = new ZooKeeper(appProperties.getServer(), appProperties.getSessionTimeout(), event -> {
log.info("Receive watched event: {}", event.getState());
//获取事件的状态
KeeperState keeperState = event.getState();
//获取时间类型
EventType eventType = event.getType();
//如果是建立连接
if (KeeperState.SyncConnected == keeperState) {
if (EventType.None == eventType) {
//如果建立连接成功,则发送信号量,让后续阻塞程序向下执行
countDownLatch.countDown();
log.info("zookeeper建立连接");
}
}
});
//进行阻塞,当执行countDownLatch.countDown();后续代码才会进行
countDownLatch.await();
return zookeeper;
}
}
这里主要是对ZooKeeper 进行连接配置,关于CountDownLatch的使用,本文最后有相关的介绍。
四、实战
定义了两个方法:加锁和释放锁。
4.1、接口
ILockService.java
package com.alian.zklock.service;
import java.util.concurrent.TimeUnit;
public interface ILockService {
/**
* 加锁
*
* @param lockPath
* @param time
* @param unit
* @return
*/
boolean lock(String lockPath, long time, TimeUnit unit);
/**
* 释放锁
*
* @return
*/
void release();
}
4.2、接口核心实现
这个实现类的注释,我想已经很详细了。可以细细阅读,可以加深你对zookeeper分布式锁实现原理的理解。
ZookeeperLockService.java
package com.alian.zklock.service.impl;
import com.alian.zklock.service.ILockService;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Service
public class ZookeeperLockService implements ILockService {
//依赖需要导入:<groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>30.1-jre</version>
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
@Autowired
private ZooKeeper zooKeeper;
//好的思想直接拿来用
private static class LockData {
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1);
//构造方法
private LockData(Thread owningThread, String lockPath) {
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
/**
* 加锁
*
* @param lockPath
* @return
* @throws Exception
*/
public boolean lock(String lockPath, long time, TimeUnit unit) {
//可重入,确保同一线程,可以重复加锁
Thread currentThread = Thread.currentThread();
//根据线程号获取线程锁数据
LockData lockData = threadData.get(currentThread);
if (lockData != null) {
// 说明该线程已加锁过,直接放行
lockData.lockCount.incrementAndGet();
return true;
}
String currentLockPath = attemptLock(lockPath, time, unit);
//如果不为空则表示获取到了锁
if (StringUtils.isNotBlank(currentLockPath)) {
//把数据缓存起来
LockData newLockData = new LockData(currentThread, currentLockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
/**
* 尝试获取锁,获取成功返回锁路径
*
* @param lockPath
* @param time
* @param unit
* @return
*/
public String attemptLock(String lockPath, long time, TimeUnit unit) {
//创建临时有序节点,传入的lockPath没有"/"
try {
String currentLockPath = zooKeeper.create(lockPath + "/", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
log.info("线程:【{}】->【{}】尝试竞争锁", Thread.currentThread().getName(), currentLockPath);
//创建临时节点失败
if (StringUtils.isBlank(currentLockPath)) {
throw new Exception("生成临时节点异常");
}
//检查当前节点是否获取到了锁
boolean hasLock = checkLocked(lockPath, currentLockPath, time, unit);
//获取到了锁则返回锁节点路径
return hasLock ? currentLockPath : null;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 检查是否获取到锁
*
* @param lockPath
* @param currentLockPath
* @param time
* @param unit
* @return
* @throws Exception
*/
public boolean checkLocked(String lockPath, String currentLockPath, long time, TimeUnit unit) {
boolean hasLock = false;
boolean toDelete = false;
try {
while (!hasLock) {
//检查是否获取到了锁,没有获取到则返回前一个节点
Pair<Boolean, String> pair = getsTheLock(lockPath, currentLockPath);
//当前节点是否获取到了锁
boolean currentLock = pair.getLeft();
//获取前一个节点
String preSequencePath = pair.getRight();
if (currentLock) {
//获取到了锁
hasLock = true;
} else {
//等待
final CountDownLatch latch = new CountDownLatch(1);
//订阅比自己次小顺序节点的删除事件
Watcher watcher = watchedEvent -> {
log.info("监听到的变化【】 watchedEvent = {}", watchedEvent);
latch.countDown();
};
Stat stat = zooKeeper.exists(preSequencePath, watcher);
if (stat != null) {
log.info("线程:【{}】等待锁【{}】释放", Thread.currentThread().getName(), preSequencePath);
boolean await = latch.await(time, unit);
if (!await) {
//说明超时了
log.info("获取锁超时");
toDelete = true;
break;
}
}
//检查锁
Pair<Boolean, String> checkPair = getsTheLock(lockPath, currentLockPath);
if (checkPair.getLeft()) {
hasLock = true;
}
}
}
} catch (Exception e) {
log.error("检查是否获取到锁异常", e);
if (e instanceof InterruptedException) {
toDelete = true;
}
} finally {
if (toDelete) {
deleteCurrentPath(currentLockPath);
}
}
return hasLock;
}
/**
* 检测是否已经获取到了锁,没有获取到则返回前一个节点
*
* @param lockPath
* @param currentLock
* @return
* @throws Exception
*/
private Pair<Boolean, String> getsTheLock(Stri以上是关于Spring Boot基于zookeeper原生方式实现分布式锁的主要内容,如果未能解决你的问题,请参考以下文章
(031)Spring Boot之服务的注册与发现,使用zookeeper演示负载均衡
(031)Spring Boot之服务的注册与发现,使用zookeeper演示负载均衡
ActiveMQ 基于zookeeper的主从(levelDB Master/Slave)搭建以及Spring-boot下使用
通过dubbo暴露接口调用方法,及基于zookeeper的dubbo涉及配置文件