ZooKeeper 分布式锁设计实战
Posted 栗子~~
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZooKeeper 分布式锁设计实战相关的知识,希望对你有一定的参考价值。
文章目录
前言
如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。
而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那欢迎常来啊!!!
ZooKeeper 分布式锁设计
01 ZooKeeper 的基础概念
02 ZooKeeper 分布式锁设计原理
通过控制创建与共享资源相关的"临时顺序节点"和动态Watcher监听机制,从而控制多线程对共享资源的并发访问。
03 ZooKeeper 分布式锁整体设计流程
04 ZooKeeper节点的数据结构与watcher监听器机制
05 实战:
05::01 依赖
<!-- zookeeper start-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
<!-- 4.0.0原生不兼容zk 3.4, 必须进行兼容性处理 -->
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- zookeeper end-->
05::02 ZooKeeper 配置
zk:
host: 127.0.0.1:2181 #ZooKeeper连接
namespace: yzy_distributeLock #命名空间只是用来区别不同的项目
/**
* zk 配置
* @yangzhenyu
*
* */
@Configuration
@ConfigurationProperties(prefix = "zk", ignoreUnknownFields = true)
public class ZookeeperConfig
//zk 连接
@Value("127.0.0.1:2181")
private String host;
//命名空间只是用来区别不同的项目
@Value("yzy_distributeLock")
private String namespace;
private static Logger log = LoggerFactory.getLogger(ZookeeperConfig.class);
public ZookeeperConfig()
log.info("=================== Zookeeper 配置===================");
/**
* 创建CuratorFramework实例
* 01 采用工厂模式进行创建
* 02 指定客户端连接zk的策略:重试机制(5次、每次间隔1秒)
* */
@Bean
public CuratorFramework curatorFramework()
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(host)
.namespace(namespace)
.retryPolicy(new RetryNTimes(5,1000))
.build();
curatorFramework.start();
return curatorFramework;
05::03 自定义异常类
/**
* 自定义异常
* @author yangzhenyu
* */
public class MyException extends RuntimeException
private String code;
public String getCode()
return code;
public MyException(Exception e)
super(e);
public MyException(ExceptionEnumService enums)
super(enums.getDesc());
this.code = enums.getCode();
public MyException(ExceptionEnumService enums, String msg, String... strings)
super(String.format(StringUtils.isEmpty(msg) ? enums.getDesc() : msg, strings));
this.code = enums.getCode();
public MyException(ExceptionEnumService enums, Throwable e, String... strings)
super(String.format(enums.getDesc(), strings), e);
this.code = enums.getCode();
public MyException(ExceptionEnumService enums, String msg, Throwable e, String... strings)
super(String.format(StringUtils.isEmpty(msg) ? enums.getDesc() : msg, strings), e);
this.code = enums.getCode();
/**
* @author yangzhenyu
* */
public enum ExceptionEnum implements ExceptionEnumService
DISTRIBUTED_SEESION_ERROR("DISTRIBUTED_SEESION_ERROR","分布式会话出现错误 失败");
/**
* 枚举编码
*/
private String code;
/**
* 枚举说明
*/
private String desc;
ExceptionEnum(String code, String desc)
this.code = code;
this.desc = desc;
@Override
public String getCode()
return this.code;
@Override
public String getDesc()
return this.desc;
public static String getDesc(String code)
if(StringUtils.isEmpty(code))
return null;
List<String> list = new ArrayList<>();
Arrays.stream(ExceptionEnum.values()).filter(t->t.code.equals(code)).forEach(v->
list.add(v.desc);
);
return list.size()>0?list.get(0):"";
05::04 返回vo
/**
* @author yangzhenyu
* */
public class ResponseBo extends HashMap<String, Object>
private static final long serialVersionUID = -8713837118340960775L;
// 成功
private static final Integer SUCCESS = 200;
// 警告
private static final Integer WARN = 1;
// 异常 失败
private static final Integer FAIL = 500;
public ResponseBo()
put("code", SUCCESS);
put("msg", "操作成功");
public static ResponseBo error(Object msg)
ResponseBo ResponseBo = new ResponseBo();
ResponseBo.put("code", FAIL);
ResponseBo.put("msg", msg);
return ResponseBo;
public static ResponseBo error(String code ,String msg,String data)
ResponseBo ResponseBo = new ResponseBo();
ResponseBo.put("code", code == null || StringUtils.isEmpty(code) ? ExceptionConfig.code : code);
ResponseBo.put("msg", StringUtils.isEmpty(msg) ? ExceptionConfig.info : msg);
ResponseBo.put("data", data);
return ResponseBo;
public static ResponseBo error(String code ,String msg)
ResponseBo ResponseBo = new ResponseBo();
ResponseBo.put("code", code == null || StringUtils.isEmpty(code) ? ExceptionConfig.code : code);
ResponseBo.put("msg", StringUtils.isEmpty(msg) ? ExceptionConfig.info : msg);
return ResponseBo;
public static ResponseBo warn(Object msg)
ResponseBo ResponseBo = new ResponseBo();
ResponseBo.put("code", WARN);
ResponseBo.put("msg", msg);
return ResponseBo;
public static ResponseBo ok(Object data)
ResponseBo ResponseBo = new ResponseBo();
ResponseBo.put("code", SUCCESS);
ResponseBo.put("data", data);
return ResponseBo;
public static ResponseBo ok()
return new ResponseBo();
public static ResponseBo error()
return ResponseBo.error("");
@Override
public ResponseBo put(String key, Object value)
super.put(key, value);
return this;
05::05 分布式锁设计
/**
* @author yangzhenyu
* 日志模板
* */
public abstract class BaseLog implements BaseLogInterface
private static Logger log = LoggerFactory.getLogger(BaseLog.class);
private ReentrantLock lock = new ReentrantLock();
private String clazz;
public void setClazz(String clazz)
this.clazz = clazz;
/**
* 打印错误日志
* */
public void error(String msg)
try
lock.lock();
log.error(msg);
finally
lock.unlock();
/**
* 打印普通日志
* */
public void info(String msg)
try
lock.lock();
log.info(msg);
finally
lock.unlock();
/**
* 返回时间日志
* */
public final void endLog(String msg,long startTime)
try
lock.lock();
long endTime = System.currentTimeMillis();
log.info("<======执行源:【】==> 执行结束,耗时:秒 =======>",clazz,msg,((endTime-startTime)/1000));
finally
lock.unlock();
/**
* 返回时间错误日志
* */
public final void endLogError(String msg,long startTime,Throwable e)
try
lock.lock();
long endTime = System.currentTimeMillis();
log.error(String.format("<======执行源:【%s】==> %s执行结束,耗时:%s秒 =======>",clazz,msg,((endTime-startTime)/1000)),e);
finally
lock.unlock();
/**
* 返回开始时间
* */
public final long startLog(String msgKey,String param)
try
lock.lock();
log.info("<======执行源:【】==>方法[]开始执行 =======>",clazz,msgKey);
log.info("<======参数:【】 =======>",param);
return System.currentTimeMillis();
finally
lock.unlock();
/**
* @author yangzhenyu
* */
@Api(value = "ZooKeeper 练习", tags = "ZooKeeper 练习")
@RestController
@RequestMapping(value="api/zk")
public class ZKController extends BaseLog
@Override
public long init(String name, String param)
this.setClazz("ZKController");
return startLog(name,param);
/**
* ZK客户端CuratorFramework实例
* */
@Autowired
private CuratorFramework curatorFramework;
/**
* 分布式锁设计原理
* 通过控制创建与共享资源相关的"临时顺序节点"和动态Watcher监听机制,从而控制多线程对共享资源的并发访问。
*
* 而ZKnode节点将对应一个具体的路径-跟Unix文件夹路径类似 ,需要以/开头
* */
private static final String PATH = "/yzy/zklock/";
/**
* ZooKeeper分布式锁设计
* */
@ApiOperation(value = "ZooKeeper分布式锁设计",notes = "ZooKeeper分布式锁设计")
@ResponseBody
@PostMapping("/lock")
public ResponseBo lock(String key) throws Exception
String msgValue = "lock";
long startTime = init(msgValue, key);
/**
* 创建zk互斥锁组件实例
* */
InterProcessMutex mutex = new InterProcessMutex(curatorFramework,PATH+ UUID.randomUUID()+"-lock");
try
/**
* 采用互斥锁组件获取分布式锁,尝试最大时间为10秒
* */
if (mutex.acquire(10L, TimeUnit.SECONDS))
//TODO : 真正的核心处理逻辑
this.info("获取到分布式锁");
this.info("执行业务逻辑");
else
this.error("获取zk分布式锁失败");
throw new MyException(ExceptionEnum.LOCK_ERROR);
endLog(msgValue,startTime);
return ResponseBo.ok();
catch (MyException m)
endLogError(msgValue,startTime,m);
return ResponseBo.error("失败:" + ExceptionEnum.getDesc(m.getCode()))SpringBoot电商项目实战 — Zookeeper的分布式锁实现
分布式锁三大技术方案实战——基于zookeeper方式实现分布式锁
《大厂高并发分布式锁从入门到实战》第4讲之Zookeeper分布式锁