Java分布式锁三种实现方案——方案三:基于Zookeeper的分布式锁,利用节点名称的唯一性来实现独占锁
Posted 是否恐惧安逸
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java分布式锁三种实现方案——方案三:基于Zookeeper的分布式锁,利用节点名称的唯一性来实现独占锁相关的知识,希望对你有一定的参考价值。
基于Zookeeper的分布式锁,利用节点名称的唯一性来实现独占锁
在介绍使用Zookeeper实现分布式锁之前,首先看当前的系统架构图
解释: 左边的整个区域表示一个Zookeeper集群,locker是Zookeeper的一个持久节点,node_1、node_2、node_3是locker这个持久节点下面的临时顺序节点。client_1、client_2、client_n表示多个客户端,Service表示需要互斥访问的共享资源。
分布式锁获取思路
1.获取分布式锁的总体思路
在获取分布式锁的时候在locker节点下创建临时顺序节点,释放锁的时候删除该临时节点。
客户端调用createNode方法在locker下创建临时顺序节点,
然后调用getChildren(“locker”)来获取locker下面的所有子节点,注意此时不用设置任何Watcher。
客户端获取到所有的子节点path之后,如果发现自己在之前创建的子节点序号最小,那么就认为该客户端获取到了锁。
如果发现自己创建的节点并非locker所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,
然后对其调用exist()方法,同时对其注册事件监听器。之后,让这个被关注的节点删除,则客户端的Watcher会收到相应通知,
此时再次判断自己创建的节点是否是locker子节点中序号最小的,如皋是则获取到了锁,
如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。
当前这个过程中还需要许多的逻辑判断。
2.获取分布式锁的核心算法流程
下面同个一个流程图来分析获取分布式锁的完整算法,如下:
现在我们选用 ZooKeeper + Curator 来完成分布式锁
先导入maven依赖 redission
<!-- curator -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
Spring 配置
<!-- Curator 客户端重试策略 -->
<bean id="retryPolicy" class="org.apache.curator.retry.ExponentialBackoffRetry">
<!-- 间隔时间基数 初始休眠时间为 1000ms-->
<constructor-arg index="0" value="1000" />
<!-- 重连策略 最大重试次数为 3-->
<constructor-arg index="1" value="3" />
</bean>
<!-- Curator 客户端对象 -->
<bean id="curatorFramework1" class="org.apache.curator.framework.CuratorFrameworkFactory" factory-method="newClient" init-method="start">
<constructor-arg index="0" value="192.168.8.128:2181" />
<!-- sessionTimeoutMs会话超时时间,单位为毫秒。默认是60000ms -->
<constructor-arg index="1" value="5000" />
<!-- connectionTimeoutMs连接创建超时时间,单位毫秒,默认15000ms -->
<constructor-arg index="2" value="3000" />
<constructor-arg index="3" ref="retryPolicy" />
</bean>
<!-- Curator 客户端对象 用户模拟其他客户端-->
<bean id="curatorFramework2" class="org.apache.curator.framework.CuratorFrameworkFactory" factory-method="newClient" init-method="start">
<constructor-arg index="0" value="192.168.8.128:2181" />
<!-- sessionTimeoutMs会话超时时间,单位为毫秒。默认是60000ms -->
<constructor-arg index="1" value="5000" />
<!-- connectionTimeoutMs连接创建超时时间,单位毫秒,默认15000ms -->
<constructor-arg index="2" value="3000" />
<constructor-arg index="3" ref="retryPolicy" />
</bean>
测试类
/**
* @作者 xuruzheng
* @创建日期 2017/9/26
* @描述
* @版本 V 1.0
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:spring/spring-service.xml"})
public class CuratorDemo {
// ZooKeeper 锁节点路径, 分布式锁的相关操作都是在这个节点上进行
private static final String lockPath = "/distributed-lock";
@Autowired
private ThreadPoolTaskExecutor threadPool;
@Resource(name = "curatorFramework1")
private CuratorFramework curatorFramework1;
@Resource(name = "curatorFramework2")
private CuratorFramework curatorFramework2;
// 原子变量
private static AtomicInteger serializable=new AtomicInteger();
public static Integer getAtomicInteger() {
return serializable.getAndIncrement();
}
@Test
public void shareLock2() throws Exception {
// 创建共享锁
InterProcessLock lock = new InterProcessSemaphoreMutex(curatorFramework1, lockPath);
// lock2 用于模拟其他客户端
InterProcessLock lock2 = new InterProcessSemaphoreMutex(curatorFramework2, lockPath);
for (int i = 0; i < 20; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Integer atomicInteger = CuratorDemo.getAtomicInteger();
if(atomicInteger/2 == 0){
// 获取锁对象
lock.acquire(100, TimeUnit.SECONDS);
//处理业务逻辑
System.out.println(atomicInteger+"\t"+Thread.currentThread().getName());
// 释放锁
lock.release();
}else{
// 获取锁对象
lock2.acquire(100, TimeUnit.SECONDS);
//处理业务逻辑
System.out.println(atomicInteger+"\t"+Thread.currentThread().getName());
// 释放锁
lock2.release();
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
System.in.read();
}
}
当然,我们使用zkclient也能实现分布式锁(废弃版,关注原理,不实际操作,推荐使用curator),具体如下:
1.定义分布式锁接口
/**
* @作者 xuruzheng
* @创建日期 2017/9/22
* @描述 1.定义分布式锁接口
* @版本 V 1.0
*/
public interface DistributedZkLock {
/**获取锁,如果没有得到就等待*/
public void acquire() throws Exception;
/**
* 获取锁,直到超时
* time超时时间
* time参数的单位
* @return是否获取到锁
*/
public boolean acquire (long time, TimeUnit unit) throws Exception;
/**
* 释放锁
* @throws Exception
*/
public void release() throws Exception;
}
2.定义一个简单的互斥锁
/**
* @作者 xuruzheng
* @创建日期 2017/9/22
* @描述 定义一个互斥锁类,实现以上定义的锁接口,同时继承一个基类BaseDistributedLock,
* 该基类主要用于与Zookeeper交互,包含一个尝试获取锁的方法和一个释放锁。
* @版本 V 1.0
*/
public class SimpleDistributedZkLockMutex extends BaseDistributedZkLock implements DistributedZkLock{
/*用于保存Zookeeper中实现分布式锁的节点,如名称为 locker:/locker,
*该节点应该是持久节点,在该节点下面创建临时顺序节点来实现分布式锁 */
private final String basePath;
/*锁名称前缀,locker下创建的顺序节点例如都以lock-开头,这样便于过滤无关节点
*这样创建后的节点类似:lock-00000001,lock-000000002*/
private static final String LOCK_NAME ="lock-";
/*用于保存某个客户端在locker下面创建成功的顺序节点,用于后续相关操作使用(如判断)*/
private String ourLockPath;
/**
* 传入Zookeeper客户端连接对象,和basePath
* @param client Zookeeper客户端连接对象
* @param basePath basePath是一个持久节点
*/
public SimpleDistributedZkLockMutex(ZkClient client, String basePath){
/*调用父类的构造方法在Zookeeper中创建basePath节点,并且为basePath节点子节点设置前缀
*同时保存basePath的引用给当前类属性*/
super(client,basePath,LOCK_NAME);
this.basePath = basePath;
}
/**获取锁,直到超时,超时后抛出异常*/
public void acquire() throws Exception {
//-1表示不设置超时时间,超时由Zookeeper决定
if (!internalLock(-1,null)){
throw new IOException("连接丢失!在路径:'"+basePath+"'下不能获取锁!");
}
}
/**
* 获取锁,带有超时时间
*/
public boolean acquire(long time, TimeUnit unit) throws Exception {
return internalLock(time, unit);
}
/**
* 用于获取锁资源,通过父类的获取锁方法来获取锁
* time获取锁的超时时间
* @param unit time的时间单位
* @return是否获取到锁
* @throws Exception
*/
private boolean internalLock (long time, TimeUnit unit) throws Exception {
//如果ourLockPath不为空则认为获取到了锁,具体实现细节见attemptLock的实现
ourLockPath = attemptLock(time, unit);
return ourLockPath !=null;
}
/**释放锁*/
public void release()throws Exception {
releaseLock(ourLockPath);
}
}
3. 分布式锁的实现细节
/**
* @作者
* @创建日期 2017/9/22
* @描述 获取分布式锁的重点逻辑在于BaseDistributedZkLock,实现了基于Zookeeper实现分布式锁的细节。
* @版本 V 1.0
*/
public class BaseDistributedZkLock {
private final ZkClient client;
private final String path;
//zookeeper中locker节点的路径
private final String basePath;
private final String lockName;
private static final Integer MAX_RETRY_COUNT = 10;
public BaseDistributedZkLock(ZkClient client, String path, String lockName){
this.client = client;
this.basePath = path;
this.path = path.concat("/").concat(lockName);
this.lockName = lockName;
}
private void deleteOurPath(String ourPath) throws Exception{
client.delete(ourPath);
}
private String createLockNode(ZkClient client, String path) throws Exception{
// 没有主节点创建主节点
getSortedChildren();
return client.createEphemeralSequential(path, null);
}
/**
* 获取锁的核心方法
*/
private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception{
boolean haveTheLock = false;
boolean doDelete = false;
try{
while ( !haveTheLock ) {
//该方法实现获取locker节点下的所有顺序节点,并且从小到大排序
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length()+1);
//计算刚才客户端创建的顺序节点在locker的所有子节点中排序位置,如果是排序为0,则表示获取到了锁
int ourIndex = children.indexOf(sequenceNodeName);
/*如果在getSortedChildren中没有找到之前创建的[临时]顺序节点,这表示可能由于网络闪断而导致
*Zookeeper认为连接断开而删除了我们创建的节点,此时需要抛出异常,让上一级去处理
*上一级的做法是捕获该异常,并且执行重试指定的次数 见后面的 attemptLock方法 */
if ( ourIndex<0 ){
throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);
}
//如果当前客户端创建的节点在locker子节点列表中位置大于0,表示其它客户端已经获取了锁
//此时当前客户端需要等待其它客户端释放锁,
boolean isGetTheLock = ourIndex == 0;
//如何判断其它客户端是否已经释放了锁?从子节点列表中获取到比自己次小的哪个节点,并对其建立监听
String pathToWatch = isGetTheLock ? null : children.get(ourIndex - 1);
if ( isGetTheLock ){
haveTheLock = true;
}else{
//如果次小的节点被删除了,则表示当前客户端的节点应该是最小的了,所以使用CountDownLatch来实现等待
String previousSequencePath = basePath .concat( "/" ) .concat( pathToWatch );
final CountDownLatch latch = new CountDownLatch(1);
final IZkDataListener previousListener = new IZkDataListener() {
//次小节点删除事件发生时,让countDownLatch结束等待
//此时还需要重新让程序回到while,重新判断一次!
public void handleDataDeleted(String dataPath) throws Exception {
latch.countDown();
}
public void handleDataChange(String dataPath, Object data) throws Exception {
// ignore
}
};
try{
// 监听节点 如果节点不存在会出现异常
client.subscribeDataChanges(previousSequencePath, previousListener);
if ( millisToWait != null ){
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 ){
doDelete = true; // timed out - delete our node
break;
}
// 让线程等待,直到过期时间
latch.await(millisToWait, TimeUnit.MICROSECONDS);
}else{
// 让线程等待
latch.await();
}
}catch ( ZkNoNodeException e ){
//ignore
}finally{
// 解除监听节点
client.unsubscribeDataChanges(previousSequencePath, previousListener);
}
}
}
}catch ( Exception e ){
//发生异常需要删除节点
doDelete = true;
throw e;
}finally{
//如果需要删除节点
if ( doDelete ){
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
private String getLockNodeNumber(String str, String lockName) {
int index = str.lastIndexOf(lockName);
if ( index >= 0 ){
index += lockName.length();
return index <= str.length() ? str.substring(index) : "";
}
return str;
}
private List<String> getSortedChildren() throws Exception {
try{
List<String> children = client.getChildren(basePath);
Collections.sort(
children,
new Comparator<String>(){
public int compare(String lhs, String rhs){
return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName));
}
}
);
return children;
}catch(ZkNoNodeException e){
// createParents参数决定了是否递归创建父节点。true表示递归创建,false表示不使用递归创建
client.createPersistent(basePath, true);
return getSortedChildren();
}
}
protected void releaseLock(String lockPath) throws Exception{
deleteOurPath(lockPath);
}
protected String attemptLock(long time, TimeUnit unit) throws Exception {
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
int retryCount = 0;
//网络闪断需要重试一试
while (!isDone) {
isDone = true;
try {
//createLockNode用于在locker(basePath持久节点)下创建客户端要获取锁的[临时]顺序节点
ourPath = createLockNode(client, path);
/**
* 该方法用于判断自己是否获取到了锁,即自己创建的顺序节点在locker的所有子节点中是否最小
* 如果没有获取到锁,则等待其它客户端锁的释放,并且稍后重试直到获取到锁或者超时
*/
hasTheLock = waitToLock(startMillis, millisToWait, ourPath);
} catch (ZkNoNodeException e) {
if (retryCount++ < MAX_RETRY_COUNT) {
isDone = false;
} else {
throw e;
}
}
}
if (hasTheLock) {
return ourPath;
}
return null;
}
}
4. 测试
/**
* @作者
* @创建日期 2017/9/22
* @描述
* @版本 V 1.0
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:spring/spring-service.xml"})
public class BaseZkDemo {
@Autowired
private ThreadPoolTaskExecutor threadPool;
/**
* 废弃版,关注原理,不使用实际操作,推荐使用curator
* @throws Exception
*/
@Test
public void testZkDistributedLock() throws Exception {
ZkClient zkClient = new ZkClient("192.168.8.128:2181", 5000, 5000, new BytesPushThroughSerializer());
System.out.println("ZK 成功建立连接!");
SimpleDistributedZkLockMutex zkLock = new SimpleDistributedZkLockMutex(zkClient, "/Mutex");
zkLock.acquire(100, TimeUnit.SECONDS);
}
}
以上是关于Java分布式锁三种实现方案——方案三:基于Zookeeper的分布式锁,利用节点名称的唯一性来实现独占锁的主要内容,如果未能解决你的问题,请参考以下文章
分布式锁三种实现方式(数据库实现,缓存Redis等,Zookeeper)