20分钟带您ZooKeeper 分布式锁设计实战
Posted 栗子~~
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了20分钟带您ZooKeeper 分布式锁设计实战相关的知识,希望对你有一定的参考价值。
文章目录
前言
如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。
而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那欢迎常来啊!!!
20分钟带您熟悉ZooKeeper 分布式锁设计
01 ZooKeeper 的基础概念
02 ZooKeeper 分布式锁设计原理
通过控制创建与共享资源相关的"临时顺序节点"和动态Watcher监听机制,从而控制多线程对共享资源的并发访问。
03 ZooKeeper 分布式锁整体设计流程
04 ZooKeeper节点的数据结构与watcher监听器机制
05 实战:
05::01 依赖
<!-- zookeeper start-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
<!-- 4.0.0原生不兼容zk 3.4, 必须进行兼容性处理 -->
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- zookeeper end-->
05::02 ZooKeeper 配置
zk:
host: 127.0.0.1:2181 #ZooKeeper连接
namespace: yzy_distributeLock #命名空间只是用来区别不同的项目
/**
* zk 配置
* @yangzhenyu
*
* */
@Configuration
@ConfigurationProperties(prefix = "zk", ignoreUnknownFields = true)
public class ZookeeperConfig {
//zk 连接
@Value("127.0.0.1:2181")
private String host;
//命名空间只是用来区别不同的项目
@Value("yzy_distributeLock")
private String namespace;
private static Logger log = LoggerFactory.getLogger(ZookeeperConfig.class);
public ZookeeperConfig() {
log.info("=================== Zookeeper 配置===================");
}
/**
* 创建CuratorFramework实例
* 01 采用工厂模式进行创建
* 02 指定客户端连接zk的策略:重试机制(5次、每次间隔1秒)
* */
@Bean
public CuratorFramework curatorFramework(){
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(host)
.namespace(namespace)
.retryPolicy(new RetryNTimes(5,1000))
.build();
curatorFramework.start();
return curatorFramework;
}
}
05::03 自定义异常类
/**
* 自定义异常
* @author yangzhenyu
* */
public class MyException extends RuntimeException {
private String code;
public String getCode() {
return code;
}
public MyException(Exception e) {
super(e);
}
public MyException(ExceptionEnumService enums) {
super(enums.getDesc());
this.code = enums.getCode();
}
public MyException(ExceptionEnumService enums, String msg, String... strings) {
super(String.format(StringUtils.isEmpty(msg) ? enums.getDesc() : msg, strings));
this.code = enums.getCode();
}
public MyException(ExceptionEnumService enums, Throwable e, String... strings) {
super(String.format(enums.getDesc(), strings), e);
this.code = enums.getCode();
}
public MyException(ExceptionEnumService enums, String msg, Throwable e, String... strings) {
super(String.format(StringUtils.isEmpty(msg) ? enums.getDesc() : msg, strings), e);
this.code = enums.getCode();
}
}
/**
* @author yangzhenyu
* */
public enum ExceptionEnum implements ExceptionEnumService{
DISTRIBUTED_SEESION_ERROR("DISTRIBUTED_SEESION_ERROR","分布式会话出现错误 失败");
/**
* 枚举编码
*/
private String code;
/**
* 枚举说明
*/
private String desc;
ExceptionEnum(String code, String desc) {
this.code = code;
this.desc = desc;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDesc() {
return this.desc;
}
public static String getDesc(String code){
if(StringUtils.isEmpty(code)) {
return null;
}
List<String> list = new ArrayList<>();
Arrays.stream(ExceptionEnum.values()).filter(t->t.code.equals(code)).forEach(v->{
list.add(v.desc);
});
return list.size()>0?list.get(0):"";
}
}
05::04 返回vo
/**
* @author yangzhenyu
* */
public class ResponseBo extends HashMap<String, Object> {
private static final long serialVersionUID = -8713837118340960775L;
// 成功
private static final Integer SUCCESS = 200;
// 警告
private static final Integer WARN = 1;
// 异常 失败
private static final Integer FAIL = 500;
public ResponseBo() {
put("code", SUCCESS);
put("msg", "操作成功");
}
public static ResponseBo error(Object msg) {
ResponseBo ResponseBo = new ResponseBo();
ResponseBo.put("code", FAIL);
ResponseBo.put("msg", msg);
return ResponseBo;
}
public static ResponseBo error(String code ,String msg,String data) {
ResponseBo ResponseBo = new ResponseBo();
ResponseBo.put("code", code == null || StringUtils.isEmpty(code) ? ExceptionConfig.code : code);
ResponseBo.put("msg", StringUtils.isEmpty(msg) ? ExceptionConfig.info : msg);
ResponseBo.put("data", data);
return ResponseBo;
}
public static ResponseBo error(String code ,String msg) {
ResponseBo ResponseBo = new ResponseBo();
ResponseBo.put("code", code == null || StringUtils.isEmpty(code) ? ExceptionConfig.code : code);
ResponseBo.put("msg", StringUtils.isEmpty(msg) ? ExceptionConfig.info : msg);
return ResponseBo;
}
public static ResponseBo warn(Object msg) {
ResponseBo ResponseBo = new ResponseBo();
ResponseBo.put("code", WARN);
ResponseBo.put("msg", msg);
return ResponseBo;
}
public static ResponseBo ok(Object data) {
ResponseBo ResponseBo = new ResponseBo();
ResponseBo.put("code", SUCCESS);
ResponseBo.put("data", data);
return ResponseBo;
}
public static ResponseBo ok() {
return new ResponseBo();
}
public static ResponseBo error() {
return ResponseBo.error("");
}
@Override
public ResponseBo put(String key, Object value) {
super.put(key, value);
return this;
}
}
05::05 分布式锁设计
/**
* @author yangzhenyu
* 日志模板
* */
public abstract class BaseLog implements BaseLogInterface{
private static Logger log = LoggerFactory.getLogger(BaseLog.class);
private ReentrantLock lock = new ReentrantLock();
private String clazz;
public void setClazz(String clazz) {
this.clazz = clazz;
}
/**
* 打印错误日志
* */
public void error(String msg){
try{
lock.lock();
log.error(msg);
}finally {
lock.unlock();
}
}
/**
* 打印普通日志
* */
public void info(String msg){
try{
lock.lock();
log.info(msg);
}finally {
lock.unlock();
}
}
/**
* 返回时间日志
* */
public final void endLog(String msg,long startTime){
try{
lock.lock();
long endTime = System.currentTimeMillis();
log.info("<======执行源:【{}】==> {}执行结束,耗时:{}秒 =======>",clazz,msg,((endTime-startTime)/1000));
}finally {
lock.unlock();
}
}
/**
* 返回时间错误日志
* */
public final void endLogError(String msg,long startTime,Throwable e){
try{
lock.lock();
long endTime = System.currentTimeMillis();
log.error(String.format("<======执行源:【%s】==> %s执行结束,耗时:%s秒 =======>",clazz,msg,((endTime-startTime)/1000)),e);
}finally {
lock.unlock();
}
}
/**
* 返回开始时间
* */
public final long startLog(String msgKey,String param){
try{
lock.lock();
log.info("<======执行源:【{}】==>方法[{}}]开始执行 =======>",clazz,msgKey);
log.info("<======参数:【{}】 =======>",param);
return System.currentTimeMillis();
}finally {
lock.unlock();
}
}
}
/**
* @author yangzhenyu
* */
@Api(value = "ZooKeeper 练习", tags = {"ZooKeeper 练习"})
@RestController
@RequestMapping(value="api/zk")
public class ZKController extends BaseLog {
@Override
public long init(String name, String param) {
this.setClazz("ZKController");
return startLog(name,param);
}
/**
* ZK客户端CuratorFramework实例
* */
@Autowired
private CuratorFramework curatorFramework;
/**
* 分布式锁设计原理
* 通过控制创建与共享资源相关的"临时顺序节点"和动态Watcher监听机制,从而控制多线程对共享资源的并发访问。
*
* 而ZKnode节点将对应一个具体的路径-跟Unix文件夹路径类似 ,需要以/开头
* */
private static final String PATH = "/yzy/zklock/";
/**
* ZooKeeper分布式锁设计
* */
@ApiOperation(value = "ZooKeeper分布式锁设计",notes = "ZooKeeper分布式锁设计")
@ResponseBody
@PostMapping("/lock")
public ResponseBo lock(String key) throws Exception {
String msgValue = "lock";
long startTime = init(msgValue, key);
/**
* 创建zk互斥锁组件实例
* */
InterProcessMutex mutex = new InterProcessMutex(curatorFramework,PATH+ UUID.randomUUID()+"-lock");
try{
/**
* 采用互斥锁组件获取分布式锁,尝试最大时间为10秒
* */
if (mutex.acquire(10L, TimeUnit.SECONDS)) {
//TODO : 真正的核心处理逻辑
this.info("获取到分布式锁");
this.info("执行业务逻辑");
}else {
this.error("获取zk分布式锁失败");
throw new MyException(ExceptionEnum.LOCK_ERROR);
}
endLog(msgValue,startTime);
return ResponseBo.ok();
}catch (MyException m){
endLogError(msgValue,startTime,m);
return ResponseBo.error("失败:" + ExceptionEnum.getDesc(m.getCode()));
}catch (Exception e){
endLogError(msgValue,startTime,e);
return ResponseBo.error("失败" + e.getMessage());
}finally {
//释放锁
mutex.release();
}
}
}
05::06 测试
启动项目->点击访问
结果:
测试通过!!!
以上是关于20分钟带您ZooKeeper 分布式锁设计实战的主要内容,如果未能解决你的问题,请参考以下文章