Zookeeper实现分布式锁
Posted 安静
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper实现分布式锁相关的知识,希望对你有一定的参考价值。
package com.web.util; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; /** * 用于跨系统或者跨服务器之间的锁,不能用于同一个虚拟机 * @author Administrator * */ public class LockUtil { /** * zk 客户端 */ private static CuratorFramework client = null; private static Logger logger = Logger.getLogger(LockUtil.class); /** * CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行 * 排他锁 阻塞:资源控制实现 */ protected static CountDownLatch latch = new CountDownLatch(1); protected static CountDownLatch shardLocklatch = new CountDownLatch(1); /** * 谁当前调用获取锁的,实际生产中代表IP地址 */ private static String selfIdentity = null; private static String selfNodeName = null; public static synchronized void init(String connectString) { if (client != null) return; RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(10000) .retryPolicy(retryPolicy) .namespace("LockService") .build(); client.start(); // 创建锁目录 try { if (client.checkExists().forPath("/ExclusiveLockDemo") == null) { client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(Ids.OPEN_ACL_UNSAFE) .forPath("/ExclusiveLockDemo"); } // 创建锁监听 addChildWatcher("/ExclusiveLockDemo"); if (client.checkExists().forPath("/ShardLockDemo") == null) { client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(Ids.OPEN_ACL_UNSAFE) .forPath("/ShardLockDemo"); } } catch (Exception e) { logger.error("ZK服务器连接不上"); throw new RuntimeException("ZK服务器连接不上"); } } /** * 获取排他锁 * <功能详细描述> */ public static synchronized void getExclusiveLock() { while (true) { try { client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).withACL(Ids.OPEN_ACL_UNSAFE).forPath( "/ExclusiveLockDemo/lock"); logger.info("成功获取到锁"); return;// 如果节点创建成功,即说明获取锁成功 } catch (Exception e) { logger.info("此次获取锁没有成功"); try { //如果没有获取到锁,需要重新设置同步资源值 if (latch.getCount() <= 0) { latch = new CountDownLatch(1); } latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); logger.error("", e1); } } } } /** * * @param type * 0为读锁,1为写锁 * @param identity * 获取当前锁的所有者 */ public static boolean getShardLock(int type, String identity) { if (identity == null || "".equals(identity)) { throw new RuntimeException("identity不能为空"); } if (identity.indexOf("-") != -1) { throw new RuntimeException("identity不能包含字符-"); } if (type != 0 && type != 1) { throw new RuntimeException("type只能为0或者1"); } String nodeName = null; if (type == 0) { nodeName = "R" + identity + "-"; } else if (type == 1) { nodeName = "W" + identity + "-"; } selfIdentity = nodeName; try { //if (client.checkExists().forPath("/ShardLockDemo/" + nodeName) == null) selfNodeName = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).withACL(Ids.OPEN_ACL_UNSAFE).forPath( "/ShardLockDemo/" + nodeName); logger.info("创建节点:" + selfNodeName); List<String> lockChildrens = client.getChildren().forPath("/ShardLockDemo"); if (!canGetLock(lockChildrens, type, nodeName.substring(0, nodeName.length() - 1), false)) { shardLocklatch.await(); } // return;// 获得锁成功就返回 } catch (Exception e) { logger.info("出现异常", e); return false; } logger.info("成功获取锁"); return true; } /** * java -jar lock.jar 0 192.168.1.1 * <功能详细描述> * @param childrens * @param type * @param identity * @param reps * @return */ private static boolean canGetLock(List<String> childrens, int type, String identity, boolean reps) { boolean res = false; if (childrens.size() <= 0) return true; try { String currentSeq = null; List<String> seqs = new ArrayList<String>(); //List<String> identitys = new ArrayList<String>(); Map<String, String> seqs_identitys = new HashMap<String, String>(); for (String child : childrens) { String splits[] = child.split("-"); seqs.add(splits[1]); //identitys.add(splits[0]); seqs_identitys.put(splits[1], splits[0]); if (identity.equals(splits[0])) currentSeq = splits[1]; } List<String> sortSeqs = new ArrayList<String>(); sortSeqs.addAll(seqs); Collections.sort(sortSeqs); // 第一个节点,则无论是读锁还是写锁都可以获取 if (currentSeq.equals(sortSeqs.get(0))) { res = true; logger.info("请求锁,因为是第一个请求锁的请求,所以获取成功"); return res; } else { // 写锁,不是第一个就一定会失败 if (type == 1) { res = false; //第一次请求取锁则设置监听,以后就不设置了,因为监听一直存在 //比如:前面有两个读锁,第三个为写锁,watch通知会多了一遍,当前面的读锁释放以后,写锁会去尝试,因为前面还有一个读锁,所以获取不到,然后就会重新注册watch,为导致通知两遍 if (reps == false) addChildWatcher("/ShardLockDemo"); logger.info("请求写锁,因为前面有其它锁,所以获取锁失败"); return res; } } // int index =-1; //因为不是第一个节点,所以要判断在当前节点之前有没有写的节点存在,如果存在写锁则不能获取读锁 boolean hasW = true; for (String seq : sortSeqs) { // ++index; if (seq.equals(currentSeq)) { break; } if (!seqs_identitys.get(seq).startsWith("W")) hasW = false; } if (type == 0 && hasW == false) { res = true; } else if (type == 0 && hasW == true) { res = false; } if (res == false) { // 添加监听,锁释放之后需要进行抢锁 addChildWatcher("/ShardLockDemo"); logger.info("因为没有获取到锁,添加锁的监听器"); } } catch (Exception e) { e.printStackTrace(); } return res; } /** * 能删除的节点尽量删除,不然会有延迟 * <功能详细描述> * @return */ public static boolean unlockForExclusive() { try { if (client.checkExists().forPath("/ExclusiveLockDemo/lock") != null) { client.delete().forPath("/ExclusiveLockDemo/lock"); } } catch (Exception e) { e.printStackTrace(); return false; } return true; } public static boolean unlockForShardLock() { try { if (client.checkExists().forPath(selfNodeName) != null) { client.delete().forPath(selfNodeName); } } catch (Exception e) { e.printStackTrace(); return false; } return true; } /** * 添加子节点的监听 * @param path * @throws Exception */ public static void addChildWatcher(String path) throws Exception { @SuppressWarnings("resource") final PathChildrenCache cache = new PathChildrenCache(client, path, true); cache.start(StartMode.POST_INITIALIZED_EVENT);// ppt中需要讲StartMode // System.out.println(cache.getCurrentData().size()); cache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) { } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { String path = event.getData().getPath(); System.out.println("收到监听" + path); if (path.contains("ExclusiveLockDemo")) { logger.info("排他锁,收到锁释放通知"); //CountDownLatch(1),此时修改为0 latch.wait()释放 latch.countDown(); } else if (path.contains("ShardLockDemo")) { logger.info("共享锁,收到锁释放通知"); //收到自己的通知就不处理 if (path.contains(selfIdentity)) return; List<String> lockChildrens = client.getChildren().forPath("/ShardLockDemo"); boolean isLock = false; try { if (selfIdentity.startsWith("R")) isLock = canGetLock(lockChildrens, 0, selfIdentity.substring(0, selfIdentity.length() - 1), true); else if (selfIdentity.startsWith("W")) isLock = canGetLock(lockChildrens, 1, selfIdentity.substring(0, selfIdentity.length() - 1), true); } catch (Exception e) { e.printStackTrace(); } logger.info("收到锁释放监听后,重新尝试获取锁,结果为:" + isLock); if (isLock) { //获得锁 logger.info("获得锁,解除因为获取不到锁的阻塞"); shardLocklatch.countDown(); } } } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { } } }); } }
package com.web; import java.util.Calendar; import java.util.Date; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.web.service.OrderRepository; import com.web.service.OrderService; import com.web.service.ProductRepository; import com.web.service.pojo.Order; import com.web.service.pojo.Product; import com.web.util.LockUtil; public class Test { private static ApplicationContext ctx; static { ctx = new ClassPathXmlApplicationContext("applicationContext.xml"); } /** * 测试超卖问题 * <功能详细描述> */ public static void testpurchase(){ OrderService os = (OrderService)ctx.getBean("orderService"); Order order = new Order(); order.setProductId(1L); order.setCreateTime(new Date()); order.setPnum(1); os.doOrder(order); } public static void testMapper(){ ProductRepository mapper = (ProductRepository)ctx.getBean("productMapper"); //测试id=1的用户查询,根据数据库中的情况,可以改成你自己的. System.out.println("得到用户id=1的用户信息"); Product product = mapper.selectProductById(1L); System.out.println(product.getName()); OrderRepository omapper = (OrderRepository)ctx.getBean("orderMapper"); Order order = new Order(); order.setProductId(1L); order.setCreateTime(new Date()); order.setPnum(1); omapper.saveOrder(order); } public static void testShardLog(int type,String identity){ System.out.println("---------------开始获取锁"+identity); LockUtil.init("localhost:2181"); LockUtil.getShardLock(type, identity); System.out.println("---------------获取锁结束"+identity); } public static void main(String[] args) { int type = Integer.parseInt(args[0]);//0读锁还是1写锁 System.out.println("type="+type);//线程名称 testShardLog(type,args[1]); //testShardLog(0,"f6"); // try { // LockUtil.init("localhost:2181"); // //LockUtil.addChildWatcher("/LockService"); // LockUtil.getExclusiveLock(); // } catch (Exception e1) { // // TODO Auto-generated catch block // e1.printStackTrace(); // } //排他锁***************************超卖测试*************************** // long nowtime = Calendar.getInstance().getTimeInMillis(); // System.out.println("begin with "+nowtime); // testpurchase(); // System.out.println("end with "+nowtime); //end排他锁***************************超卖测试*************************** // String []names = ctx.getBeanDefinitionNames(); // for(String s: names){ // System.out.println(s); // } try { Thread.sleep(20000); System.out.println("-------------开始--释放锁"); LockUtil.unlockForShardLock(); System.out.println("-------------成功--释放锁"); Thread.sleep(30000); System.out.println("---------------结束退出"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
以上是关于Zookeeper实现分布式锁的主要内容,如果未能解决你的问题,请参考以下文章