基于zookeeper的分布式锁实现

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于zookeeper的分布式锁实现 相关的知识,希望对你有一定的参考价值。

  • 工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现

     

    准备工作

    有几个帮助类,先把代码放上来

    ZKClient 对zk的操作做了一个简单的封装

     

    Java代码  

     ZKUtil 针对zk路径的一个工具类

    Java代码  

     NetworkUtil 获取本机IP的工具方法

    Java代码  

     

    --------------------------- 正文开始  -----------------------------------

    这种实现非常简单,具体的流程如下


    技术分享
     对应的实现如下

    Java代码  

     

    总结

    网上有很多文章,大家的方法大多数都是创建一个root根节点,每一个trylock的客户端都会在root下创建一个 EPHEMERAL_SEQUENTIAL 的子节点,同时设置root的child 变更watcher(为了避免羊群效应,可以只添加前一个节点的变更通知) .如果创建的节点的序号是最小,则获取到锁,否则继续等待root的child 变更


  1. package zk.lock;  

  2.   

  3.   

  4. import zk.util.NetworkUtil;  

  5. import zk.util.ZKUtil;  

  6.   

  7. /** 

  8.  * User: zhenghui 

  9.  * Date: 14-3-26 

  10.  * Time: 下午8:37 

  11.  * 分布式锁实现. 

  12.  * 

  13.  * 这种实现的原理是,创建某一个任务的节点,比如 /lock/tasckname 然后获取对应的值,如果是当前的Ip,那么获得锁,如果不是,则没获得 

  14.  * .如果该节点不存在,则创建该节点,并把改节点的值设置成当前的IP 

  15.  */  

  16. public class DistributedLock01 {  

  17.   

  18.     private ZKClient zkClient;  

  19.   

  20.   

  21.     public static final String LOCK_ROOT = "/lock";  

  22.     private String lockName;  

  23.   

  24.   

  25.     public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {  

  26.         //先创建zk链接.  

  27.         this.createConnection(connectString,sessionTimeout);  

  28.   

  29.         this.lockName = lockName;  

  30.     }  

  31.   

  32.     public boolean tryLock(){  

  33.         String path = ZKUtil.contact(LOCK_ROOT,lockName);  

  34.         String localIp = NetworkUtil.getNetworkAddress();  

  35.         try {  

  36.             if(zkClient.exists(path)){  

  37.                 String ownnerIp = zkClient.getData(path);  

  38.                 if(localIp.equals(ownnerIp)){  

  39.                     return true;  

  40.                 }  

  41.             } else {  

  42.                 zkClient.createPathIfAbsent(path,false);  

  43.                 if(zkClient.exists(path)){  

  44.                     String ownnerIp = zkClient.getData(path);  

  45.                     if(localIp.equals(ownnerIp)){  

  46.                         return true;  

  47.                     }  

  48.                 }  

  49.             }  

  50.         } catch (Exception e) {  

  51.             e.printStackTrace();  

  52.         }  

  53.         return false;  

  54.     }  

  55.   

  56.   

  57.     /** 

  58.      * 创建zk连接 

  59.      * 

  60.      */  

  61.     protected void createConnection(String connectString, int sessionTimeout) throws Exception {  

  62.         if(zkClient != null){  

  63.             releaseConnection();  

  64.         }  

  65.         zkClient = new ZKClient(connectString,sessionTimeout);  

  66.         zkClient.createPathIfAbsent(LOCK_ROOT,true);  

  67.     }  

  68.     /** 

  69.      * 关闭ZK连接 

  70.      */  

  71.     protected void releaseConnection() throws InterruptedException {  

  72.         if (zkClient != null) {  

  73.             zkClient.close();  

  74.         }  

  75.     }  

  76.   

  77. }  

  1. package zk.util;  

  2.   

  3. import java.net.InetAddress;  

  4. import java.net.NetworkInterface;  

  5. import java.util.Enumeration;  

  6.   

  7. /** 

  8.  * User: zhenghui 

  9.  * Date: 14-4-1 

  10.  * Time: 下午4:47 

  11.  */  

  12. public class NetworkUtil {  

  13.   

  14.     static private final char COLON = ‘:‘;  

  15.   

  16.     /** 

  17.      * 获取当前机器ip地址 

  18.      * 据说多网卡的时候会有问题. 

  19.      */  

  20.     public static String getNetworkAddress() {  

  21.         Enumeration<NetworkInterface> netInterfaces;  

  22.         try {  

  23.             netInterfaces = NetworkInterface.getNetworkInterfaces();  

  24.             InetAddress ip;  

  25.             while (netInterfaces.hasMoreElements()) {  

  26.                 NetworkInterface ni = netInterfaces  

  27.                         .nextElement();  

  28.                 Enumeration<InetAddress> addresses=ni.getInetAddresses();  

  29.                 while(addresses.hasMoreElements()){  

  30.                     ip = addresses.nextElement();  

  31.                     if (!ip.isLoopbackAddress()  

  32.                             && ip.getHostAddress().indexOf(COLON) == -1) {  

  33.                         return ip.getHostAddress();  

  34.                     }  

  35.                 }  

  36.             }  

  37.             return "";  

  38.         } catch (Exception e) {  

  39.             return "";  

  40.         }  

  41.     }  

  42. }  

  1. package zk.util;  

  2.   

  3. /** 

  4.  * User: zhenghui 

  5.  * Date: 14-3-26 

  6.  * Time: 下午9:56 

  7.  */  

  8. public class ZKUtil {  

  9.   

  10.     public static final String SEPARATOR = "/";  

  11.   

  12.     /** 

  13.      * 转换path为zk的标准路径 以/开头,最后不带/ 

  14.      */  

  15.     public static String normalize(String path) {  

  16.         String temp = path;  

  17.         if(!path.startsWith(SEPARATOR)) {  

  18.             temp = SEPARATOR + path;  

  19.         }  

  20.         if(path.endsWith(SEPARATOR)) {  

  21.             temp = temp.substring(0, temp.length()-1);  

  22.             return normalize(temp);  

  23.         }else {  

  24.             return temp;  

  25.         }  

  26.     }  

  27.   

  28.     /** 

  29.      * 链接两个path,并转化为zk的标准路径 

  30.      */  

  31.     public static String contact(String path1,String path2){  

  32.         if(path2.startsWith(SEPARATOR)) {  

  33.             path2 = path2.substring(1);  

  34.         }  

  35.         if(path1.endsWith(SEPARATOR)) {  

  36.             return normalize(path1 + path2);  

  37.         } else {  

  38.             return normalize(path1 + SEPARATOR + path2);  

  39.         }  

  40.     }  

  41.   

  42.     /** 

  43.      * 字符串转化成byte类型 

  44.      */  

  45.     public static byte[] toBytes(String data) {  

  46.         if(data == null || data.trim().equals("")) return null;  

  47.         return data.getBytes();  

  48.     }  

  49. }  

  1. package zk.lock;  

  2.   

  3. import org.apache.zookeeper.*;  

  4. import org.apache.zookeeper.data.Stat;  

  5. import zk.util.ZKUtil;  

  6.   

  7. import java.util.concurrent.CountDownLatch;  

  8. import java.util.concurrent.TimeUnit;  

  9.   

  10. /** 

  11.  * User: zhenghui 

  12.  * Date: 14-3-26 

  13.  * Time: 下午8:50 

  14.  * 封装一个zookeeper实例. 

  15.  */  

  16. public class ZKClient implements Watcher {  

  17.   

  18.     private ZooKeeper zookeeper;  

  19.   

  20.     private CountDownLatch connectedSemaphore = new CountDownLatch(1);  

  21.   

  22.   

  23.     public ZKClient(String connectString, int sessionTimeout) throws Exception {  

  24.         zookeeper = new ZooKeeper(connectString, sessionTimeout, this);  

  25.         System.out.println("connecting zk server");  

  26.         if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {  

  27.             System.out.println("connect zk server success");  

  28.         } else {  

  29.             System.out.println("connect zk server error.");  

  30.             throw new Exception("connect zk server error.");  

  31.         }  

  32.     }  

  33.   

  34.     public void close() throws InterruptedException {  

  35.         if (zookeeper != null) {  

  36.             zookeeper.close();  

  37.         }  

  38.     }  

  39.   

  40.     public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {  

  41.         CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;  

  42.         path = ZKUtil.normalize(path);  

  43.         if (!this.exists(path)) {  

  44.             zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);  

  45.         }  

  46.     }  

  47.   

  48.     public boolean exists(String path) throws Exception {  

  49.         path = ZKUtil.normalize(path);  

  50.         Stat stat = zookeeper.exists(path, null);  

  51.         return stat != null;  

  52.     }  

  53.   

  54.     public String getData(String path) throws Exception {  

  55.         path = ZKUtil.normalize(path);  

  56.         try {  

  57.             byte[] data = zookeeper.getData(path, nullnull);  

  58.             return new String(data);  

  59.         } catch (KeeperException e) {  

  60.             if (e instanceof KeeperException.NoNodeException) {  

  61.                 throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);  

  62.             } else {  

  63.                 throw new Exception(e);  

  64.             }  

  65.         } catch (InterruptedException e) {  

  66.             Thread.currentThread().interrupt();  

  67.             throw new Exception(e);  

  68.         }  

  69.     }  

  70.   

  71.     @Override  

  72.     public void process(WatchedEvent event) {  

  73.         if (event == nullreturn;  

  74.   

  75.         // 连接状态  

  76.         Watcher.Event.KeeperState keeperState = event.getState();  

  77.         // 事件类型  

  78.         Watcher.Event.EventType eventType = event.getType();  

  79.         // 受影响的path  

  80. //        String path = event.getPath();  

  81.         if (Watcher.Event.KeeperState.SyncConnected == keeperState) {  

  82.             // 成功连接上ZK服务器  

  83.             if (Watcher.Event.EventType.None == eventType) {  

  84.                 System.out.println("zookeeper connect success");  

  85.                 connectedSemaphore.countDown();  

  86.             }  

  87.         }  

  88.         //下面可以做一些重连的工作.  

  89.         else if (Watcher.Event.KeeperState.Disconnected == keeperState) {  

  90.             System.out.println("zookeeper Disconnected");  

  91.         } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {  

  92.             System.out.println("zookeeper AuthFailed");  

  93.         } else if (Watcher.Event.KeeperState.Expired == keeperState) {  

  94.             System.out.println("zookeeper Expired");  

  95.         }  

  96.     }  

  97. }  

框架/平台构成:Maven+Springmvc + Mybatis + Shiro(权限)+ Tiles(模板) +ActiveMQ(消息队列) + Rest(服务) + WebService(服务)+ EHcache(缓存) + Quartz(定时调度)+ html5(支持PC、iosandroid)用户权限系统:组织结构:角色、用户、用户组、组织机构;权限点:页面、方法、按钮、数据权限、分级授权项目管理新体验:快速出原型系统、组件树、版本控制、模块移植、协同开发、实时监控、发布管理可持续集成:所有组件可移植、可定制、可扩充,开发成果不断积累,形成可持续发展的良性循环支持平台平台: Windows XP、Windows 7 、Windows 10 、 Linux 、 Unix服务器容器:Tomcat 5/6/7 、Jetty、JBoss、WebSphere 8.5                                           


以上是关于基于zookeeper的分布式锁实现 的主要内容,如果未能解决你的问题,请参考以下文章

基于zookeeper实现分布式锁

Java分布式锁三种实现方案——方案三:基于Zookeeper的分布式锁,利用节点名称的唯一性来实现独占锁

Redis实现分布式锁与Zookeeper实现分布式锁区别

基于zookeeper实现分布式锁

Zookeeper分布式锁

漫谈分布式锁之ZooKeeper实现