SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理
Posted 嘉禾嘉宁papa
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理相关的知识,希望对你有一定的参考价值。
一、简介
我们知道在JDK 的 java.util.concurrent.locks包中提供了可重入锁,读写锁,及超时获取锁的方法等,但在分布式系统中,当多个应用需要共同操作某一个资源时,就没办法使用JDK里的锁实现了,所以今天的主角就是ZooKeeper + Curator 来完成分布式锁,Curator 提供的四种锁方案:
- InterProcessMutex:分布式可重入排它锁
- InterProcessSemaphoreMutex:分布式排它锁
- InterProcessReadWriteLock:分布式读写锁
- InterProcessMultiLock:将多个锁作为单个实体管理的容器
本文主要介绍可重入排它锁 InterProcessMutex 的相关使用及源码解读。
二、maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.alian</groupId>
<artifactId>zookeeper-curator</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>zookeeper-curator</name>
<description>SpringBoot基于Zookeeper和Curator实现分布式锁</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.14</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
三、配置类
3.1、属性配置文件
# zookeeper服务器地址(ip+port)
zookeeper.server=10.130.3.16:2181
# 休眠时间
zookeeper.sleep-time=1000
# 最大重试次数
zookeeper.max-retries=3
# 会话超时时间
zookeeper.session-timeout=15000
# 连接超时时间
zookeeper.connection-timeout=5000
本机环境有限就不搭建集群了,具体还是在于curator分布式锁的使用及原理。
3.2、属性配置类
此配置类不懂的可以参考我另一篇文章:Spring Boot读取配置文件常用方式
ZookeeperProperties.java
package com.alian.zookeepercurator.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "zookeeper")
//读取指定路径配置文件,暂不支持*.yaml文件
@PropertySource(value = "classpath:config/zookeeper.properties", encoding = "UTF-8", ignoreResourceNotFound = true)
public class ZookeeperProperties {
/**
* zookeeper服务地址
*/
private String server;
/**
* 重试等待时间
*/
private int sleepTime;
/**
* 最大重试次数
*/
private int maxRetries;
/**
* session超时时间
*/
private int sessionTimeout;
/**
* 连接超时时间
*/
private int connectionTimeout;
}
3.3、ZookeeperConfig配置类(重要)
ZookeeperConfig.java
此配置类主要是使用CuratorFramework来连接zookeeper。
package com.alian.zookeepercurator.config;
import com.alian.zookeepercurator.common.ZookeeperClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class ZookeeperConfig {
@Autowired
private ZookeeperProperties zookeeperProperties;
@Bean
public CuratorFramework curatorFrameworkClient() {
//重试策略,ExponentialBackoffRetry(1000,3)这里表示等待1s重试,最大重试次数为3次
RetryPolicy policy = new ExponentialBackoffRetry(zookeeperProperties.getSleepTime(), zookeeperProperties.getMaxRetries());
//构建CuratorFramework实例
CuratorFramework curatorFrameworkClient = CuratorFrameworkFactory
.builder()
.connectString(zookeeperProperties.getServer())
.sessionTimeoutMs(zookeeperProperties.getSessionTimeout())
.connectionTimeoutMs(zookeeperProperties.getConnectionTimeout())
.retryPolicy(policy)
.build();
//启动实例
curatorFrameworkClient.start();
return curatorFrameworkClient;
}
//采用这种方式注册bean可以比较优雅的关闭连接
@Bean(destroyMethod = "destroy")
public ZookeeperClient zookeeperClient(CuratorFramework curatorFrameworkClient) {
return new ZookeeperClient(curatorFrameworkClient);
}
}
3.4、ZookeeperClient配置类(重要)
ZookeeperClient.java
这个bean是在上面的配置类里定义的,还定义了销毁的方法,这样的好处是,当服务断开后,可以关闭连接,如果直接关闭服务可能会抛出一个异常。使用和其他的使用是一样的,当然如果你为了方便,使用@Component也没有问题。
package com.alian.zookeepercurator.common;
import com.alian.zookeepercurator.lock.AbstractLock;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
@Slf4j
public class ZookeeperClient {
private CuratorFramework curatorFramework;
public ZookeeperClient(CuratorFramework curatorFramework) {
this.curatorFramework = curatorFramework;
}
public <T> T lock(AbstractLock<T> abstractLock) {
//获取锁路径
String lockPath = abstractLock.getLockPath();
//创建InterProcessMutex实例
InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath); //创建锁对象
boolean success = false;
try {
try {
//加锁
success = lock.acquire(abstractLock.getTime(), abstractLock.getTimeUnit()); //获取锁
} catch (Exception e) {
throw new RuntimeException("尝试获取锁异常:" + e.getMessage() + ", lockPath " + lockPath);
}
//判断是否加锁成功
if (success) {
return abstractLock.execute();
} else {
log.info("获取锁失败,返回null");
return null;
}
} finally {
try {
if (success) {
//释放锁
lock.release();
}
} catch (Exception e) {
log.error("释放锁异常: {}, lockPath {}", e.getMessage(), lockPath);
}
}
}
//bean的销毁方法
public void destroy() {
try {
log.info("ZookeeperClient销毁方法,如果zookeeper连接不为空,则关闭连接");
if (getCuratorFramework() != null) {
//这种方式比较优雅的关闭连接
getCuratorFramework().close();
}
} catch (Exception e) {
log.error("stop zookeeper client error {}", e.getMessage());
}
}
public CuratorFramework getCuratorFramework() {
return curatorFramework;
}
}
四、业务编写
4.1、抽象类AbstractLock
AbstractLock.java
定义一个抽象锁的类,包含锁路径,过期时间及时间单位,子类只需要实现execute方法即可。
package com.alian.zookeepercurator.common;
import java.util.concurrent.TimeUnit;
public abstract class AbstractLock<T> {
/**
* 锁路径
*/
protected String lockPath;
/**
* 超时时间
*/
protected long time;
protected TimeUnit timeUnit;
public AbstractLock(String lockPath, long time, TimeUnit timeUnit) {
this.lockPath = lockPath;
this.time = time;
this.timeUnit = timeUnit;
}
public void setLockPath(String lockPath) {
this.lockPath = lockPath;
}
public String getLockPath() {
return lockPath;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public void setTimeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
}
public TimeUnit getTimeUnit() {
return timeUnit;
}
/**
* 执行业务的方法
*
* @return
*/
public abstract T execute();
}
4.2、锁使实现(核心)
CuratorLockService.java
package com.alian.zookeepercurator.service;
import com.alian.zookeepercurator.common.ZookeeperClient;
import com.alian.zookeepercurator.common.AbstractLock;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class CuratorLockService {
@Autowired
private ZookeeperClient zookeeperClient;
@Autowired
private CuratorFramework curatorFramework;
//库存存取的路径
private static final String dataPath = "/root/data/stock";
//初始化库存的路径
private static final String initPath = "/root/init/stock";
/**
* 此方法系统启动执行,使用zookeeper存一个库存用于测试,这里也使用了锁。(只是一个模拟初始化库存的方法)
*/
@PostConstruct
public void init() {
zookeeperClient.lock(new AbstractLock<Boolean>(initPath, 20, TimeUnit.SECONDSSpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理
SpringBoot电商项目实战 — Zookeeper的分布式锁实现
SpringBoot2 整合 Zookeeper组件,管理架构中服务协调