zookeeper 分布式锁
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper 分布式锁相关的知识,希望对你有一定的参考价值。
分布式锁有很多,redis也可以实现分布式锁,
http://shangdc.blog.51cto.com/10093778/1914852(查看redis的分布式锁)
zookeeper分布式锁步骤:
1、zookeeper是一个带有节点的,类似于文件目录,所以我们把锁抽象成目录,zookeeper有一个EPHEMERAL_SEQUENTIAL类型的节点, 多个线程再zookeeper创建的节点的时候,它会帮我们安排好顺序进行创建,所以这个节点下的目录都是顺序的。
2、获取当前目录的最小的节点,判断最小节点是不是当前的自己的节点,如果是说明获取锁成功了,如果不是获取锁失败了。
3、当获取锁的时候失败了,为了避免惊群效应,你要做的就是获取当前自己的节点的上一个节点,然后对该节点进行监听,当上一个节点删除的时候,会触发这个监听,通知该节点。
4、这么做,释放锁的时候,也会通知下一个节点。
什么是惊群效应:理解为肉少狼多,当一个节点删除的时候,凡是订阅了此节点的watcha的监听都会重新获取锁,都要去争夺,如果数量少还好,当数量很大的时候这种设计就是不合理也是浪费资源。
zookeeper的状态和事件类型,提前了解一下。
状态 KeeperState.Disconnected (0) 断开 * KeeperState.SyncConnected (3) 同步连接状态 * KeeperState.AuthFailed (4) 认证失败状态 * KeeperState.ConnectedReadOnly (5) 只读连接状态 * KeeperState.SaslAuthenticated (6) SASL认证通过状态 * KeeperState.Expired (-112) 过期状态 * * // EventType 是事件类型 主要关注 Create Delete DataChanged ChildrenChanged * EventType.None (-1), 无 * EventType.NodeCreated (1), * EventType.NodeDeleted (2), * EventType.NodeDataChanged (3), 结点数据变化 * EventType.NodeChildrenChanged (4); 结点子节点变化
下面是代码,自己敲下,理解一下。
package com.lhcis.spider.system.annotation; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author sdc * */ public class ZooDistributeLock implements Watcher { private static final Logger LOG = LoggerFactory.getLogger(ZooDistributeLock.class); private static final String LOCK_PATH = "/zkLock"; // 模拟开启的线程数 private static final int THREAD_NUM = 5; // 用于等待所有线程都连接成功后再执行任务 private static CountDownLatch startFlag = new CountDownLatch(1); // 用于确保所有线程执行完毕 private static CountDownLatch threadFlag = new CountDownLatch(THREAD_NUM); private ZooKeeper zk = null; private String currentPath; private String lockPath; public static void main(String[] args) { for (int i = 0; i < THREAD_NUM; i++) { final int j = i; new Thread() { @Override public void run() { ZooDistributeLock zooDistributeLock = new ZooDistributeLock(); try { zooDistributeLock.connection(); System.out.println("连接" + j); zooDistributeLock.createNode(); System.out.println("创建" + j); zooDistributeLock.getLock(); System.out.println("获取锁" + j); } catch (IOException | InterruptedException | KeeperException e) { e.printStackTrace(); } } }.start(); } try { threadFlag.await(); LOG.info("所有线程执行完毕..."); } catch (InterruptedException e) { LOG.error(e.getMessage(), e); } } /** * Disconnected为网络闪断时触发的事件,当然其他的拔掉网线、kill zookeeper server ,kill zk * connection也会触发该事件。 SyncConnected为client端重新选择下一个zk * server连接触发的事件,此时watcher有效,也就是能正常感知 * Expired为客户端重新连server时,服务端发现该session超过了设定的时长,返回给client * Expired,此时watcher失效,也就是不能正常感知 */ @Override public void process(WatchedEvent event) { Event.KeeperState state = event.getState(); Event.EventType type = event.getType(); if (Event.KeeperState.SyncConnected == state) { if (Event.EventType.None == type) { // 标识连接成功 LOG.info("成功连接上ZK服务器"); startFlag.countDown(); } if (Event.EventType.NodeDeleted == type && event.getPath().equals(this.lockPath)) { LOG.info("node:" + this.lockPath + "的锁已经被释放"); try { // 上一个节点释放了,当前节点去获取锁 getLock(); } catch (KeeperException | InterruptedException e) { LOG.error(e.getMessage(), e); } } } } /** * 连接到 ZK * * @throws IOException */ private void connection() throws IOException, InterruptedException { zk = new ZooKeeper("127.0.0.1:2181", 5000, this); // 等待连接成功后再执行下一步操作 startFlag.await(); } // 创建节点,并初始化当前路径 private void createNode() throws KeeperException, InterruptedException, UnsupportedEncodingException { this.currentPath = this.zk.create(LOCK_PATH, "".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } private void getLock() throws KeeperException, InterruptedException { if (minNode()) { doSomething(); // 释放锁 releaseLock(); } } /** * 当前是否为最小节点 * * @return */ private boolean minNode() { // 当前序号 try { initLockPath(); // 判断前一个节点存在不存在,如果存在,则表示当前节点不是最小节点 // zk.getData(this.lockPath, this, new Stat()); zk.getData(this.lockPath, true, new Stat()); LOG.info(this.currentPath + " 不是最小值,没有获取锁,等待 " + this.lockPath + " 释放锁"); return false; } catch (KeeperException e) { LOG.info(this.currentPath + " 是最小值,获得锁"); return true; } catch (InterruptedException e) { LOG.error(e.getMessage(), e); } return true; } private void doSomething() { LOG.info("处理业务逻辑..."); } /** * 释放锁并关闭连接 * * @throws KeeperException * @throws InterruptedException */ private void releaseLock() throws KeeperException, InterruptedException { Thread.sleep(2000); if (this.zk != null) { LOG.info(this.currentPath + " 业务处理完毕,释放锁..."); zk.delete(this.currentPath, -1); this.zk.close(); LOG.info(Thread.currentThread().getName() + "关闭 zookeeper 连接"); } threadFlag.countDown(); } /** * 初始化 lockpath */ private void initLockPath() { int currentSeq = Integer.parseInt(this.currentPath.substring(LOCK_PATH.length())); // 上一个序号 int preSeq = currentSeq - 1; String preSeqStr = String.valueOf(preSeq); while (preSeqStr.length() < 10) { preSeqStr = "0" + preSeqStr; } this.lockPath = LOCK_PATH + preSeqStr; } }
参考代码:
https://juejin.im/entry/596438bc6fb9a06bb47495f1
本文出自 “不积跬步无以至千里” 博客,请务必保留此出处http://shangdc.blog.51cto.com/10093778/1958619
以上是关于zookeeper 分布式锁的主要内容,如果未能解决你的问题,请参考以下文章