Zookeeper实现分布式锁

Posted 安静

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper实现分布式锁相关的知识,希望对你有一定的参考价值。

package com.web.util;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;

/**
 * 用于跨系统或者跨服务器之间的锁,不能用于同一个虚拟机
 * @author Administrator
 *
 */
public class LockUtil {
    /**
     * zk 客户端
     */
    private static CuratorFramework client = null;
    
    private static Logger logger = Logger.getLogger(LockUtil.class);
    
    /**
     * CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行
     * 排他锁  阻塞:资源控制实现 
     */
    protected static CountDownLatch latch = new CountDownLatch(1);
    
    protected static CountDownLatch shardLocklatch = new CountDownLatch(1);
    
    /**
     * 谁当前调用获取锁的,实际生产中代表IP地址
     */
    private static String selfIdentity = null;
    
    private static String selfNodeName = null;
    
    public static synchronized void init(String connectString) {
        if (client != null)
            return;
        
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder()
            .connectString(connectString)
            .sessionTimeoutMs(10000)
            .retryPolicy(retryPolicy)
            .namespace("LockService")
            .build();
        client.start();
        
        // 创建锁目录
        try {
            if (client.checkExists().forPath("/ExclusiveLockDemo") == null) {
                client.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .withACL(Ids.OPEN_ACL_UNSAFE)
                    .forPath("/ExclusiveLockDemo");
            }
            // 创建锁监听
            addChildWatcher("/ExclusiveLockDemo");
            if (client.checkExists().forPath("/ShardLockDemo") == null) {
                client.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .withACL(Ids.OPEN_ACL_UNSAFE)
                    .forPath("/ShardLockDemo");
            }
        }
        catch (Exception e) {
            logger.error("ZK服务器连接不上");
            throw new RuntimeException("ZK服务器连接不上");
        }
    }
    /**
     * 获取排他锁
     * <功能详细描述>
     */
    public static synchronized void getExclusiveLock() {
        while (true) {
            try {
                client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).withACL(Ids.OPEN_ACL_UNSAFE).forPath(
                    "/ExclusiveLockDemo/lock");
                logger.info("成功获取到锁");
                return;// 如果节点创建成功,即说明获取锁成功
            }
            catch (Exception e) {
                logger.info("此次获取锁没有成功");
                try {
                    //如果没有获取到锁,需要重新设置同步资源值
                    if (latch.getCount() <= 0) {
                        latch = new CountDownLatch(1);
                    }
                    latch.await();
                }
                catch (InterruptedException e1) {
                    e1.printStackTrace();
                    logger.error("", e1);
                }
            }
        }
    }
    
    /**
     * 
     * @param type
     *            0为读锁,1为写锁
     * @param identity
     *            获取当前锁的所有者
     */
    public static boolean getShardLock(int type, String identity) {
        if (identity == null || "".equals(identity)) {
            throw new RuntimeException("identity不能为空");
        }
        if (identity.indexOf("-") != -1) {
            throw new RuntimeException("identity不能包含字符-");
        }
        if (type != 0 && type != 1) {
            throw new RuntimeException("type只能为0或者1");
        }
        String nodeName = null;
        if (type == 0) {
            nodeName = "R" + identity + "-";
        }
        else if (type == 1) {
            nodeName = "W" + identity + "-";
        }
        selfIdentity = nodeName;
        try {
            //if (client.checkExists().forPath("/ShardLockDemo/" + nodeName) == null)
            selfNodeName =
                client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).withACL(Ids.OPEN_ACL_UNSAFE).forPath(
                    "/ShardLockDemo/" + nodeName);
            logger.info("创建节点:" + selfNodeName);
            List<String> lockChildrens = client.getChildren().forPath("/ShardLockDemo");
            if (!canGetLock(lockChildrens, type, nodeName.substring(0, nodeName.length() - 1), false)) {
                shardLocklatch.await();
            }
            // return;// 获得锁成功就返回
        }
        catch (Exception e) {
            logger.info("出现异常", e);
            return false;
        }
        
        logger.info("成功获取锁");
        return true;
    }
    /**
     * java -jar lock.jar 0 192.168.1.1
     * <功能详细描述>
     * @param childrens
     * @param type
     * @param identity
     * @param reps
     * @return
     */
    private static boolean canGetLock(List<String> childrens, int type, String identity, boolean reps) {
        boolean res = false;
        if (childrens.size() <= 0)
            return true;
        
        try {
            String currentSeq = null;
            List<String> seqs = new ArrayList<String>();
            //List<String> identitys = new ArrayList<String>();
            Map<String, String> seqs_identitys = new HashMap<String, String>();
            for (String child : childrens) {
                String splits[] = child.split("-");
                seqs.add(splits[1]);
                //identitys.add(splits[0]);
                seqs_identitys.put(splits[1], splits[0]);
                if (identity.equals(splits[0]))
                    currentSeq = splits[1];
            }
            
            List<String> sortSeqs = new ArrayList<String>();
            sortSeqs.addAll(seqs);
            Collections.sort(sortSeqs);
            
            // 第一个节点,则无论是读锁还是写锁都可以获取
            if (currentSeq.equals(sortSeqs.get(0))) {
                res = true;
                logger.info("请求锁,因为是第一个请求锁的请求,所以获取成功");
                return res;
            }
            else {
                // 写锁,不是第一个就一定会失败
                if (type == 1) {
                    res = false;
                    //第一次请求取锁则设置监听,以后就不设置了,因为监听一直存在
                    //比如:前面有两个读锁,第三个为写锁,watch通知会多了一遍,当前面的读锁释放以后,写锁会去尝试,因为前面还有一个读锁,所以获取不到,然后就会重新注册watch,为导致通知两遍
                    if (reps == false)
                        addChildWatcher("/ShardLockDemo");
                    logger.info("请求写锁,因为前面有其它锁,所以获取锁失败");
                    return res;
                }
            }
            // int index =-1;
            //因为不是第一个节点,所以要判断在当前节点之前有没有写的节点存在,如果存在写锁则不能获取读锁
            boolean hasW = true;
            for (String seq : sortSeqs) {
                // ++index;
                if (seq.equals(currentSeq)) {
                    break;
                }
                if (!seqs_identitys.get(seq).startsWith("W"))
                    hasW = false;
            }
            if (type == 0 && hasW == false) {
                res = true;
            }
            else if (type == 0 && hasW == true) {
                res = false;
            }
            if (res == false) {
                // 添加监听,锁释放之后需要进行抢锁
                addChildWatcher("/ShardLockDemo");
                logger.info("因为没有获取到锁,添加锁的监听器");
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return res;
    }
    /**
     * 能删除的节点尽量删除,不然会有延迟
     * <功能详细描述>
     * @return
     */
    public static boolean unlockForExclusive() {
        try {
            if (client.checkExists().forPath("/ExclusiveLockDemo/lock") != null) {
                client.delete().forPath("/ExclusiveLockDemo/lock");
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
    
    public static boolean unlockForShardLock() {
        try {
            if (client.checkExists().forPath(selfNodeName) != null) {
                client.delete().forPath(selfNodeName);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
    /**
     * 添加子节点的监听
     * @param path
     * @throws Exception
     */
    public static void addChildWatcher(String path) throws Exception {
        @SuppressWarnings("resource")
        final PathChildrenCache cache = new PathChildrenCache(client, path, true);
        cache.start(StartMode.POST_INITIALIZED_EVENT);// ppt中需要讲StartMode
        // System.out.println(cache.getCurrentData().size());
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
                    
                }
                else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
                    
                }
                else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                    String path = event.getData().getPath();
                    System.out.println("收到监听" + path);
                    if (path.contains("ExclusiveLockDemo")) {
                        logger.info("排他锁,收到锁释放通知");
                        //CountDownLatch(1),此时修改为0 latch.wait()释放
                        latch.countDown();
                    }
                    else if (path.contains("ShardLockDemo")) {
                        logger.info("共享锁,收到锁释放通知");
                        //收到自己的通知就不处理
                        if (path.contains(selfIdentity))
                            return;
                        List<String> lockChildrens = client.getChildren().forPath("/ShardLockDemo");
                        boolean isLock = false;
                        try {
                            if (selfIdentity.startsWith("R"))
                                isLock = canGetLock(lockChildrens, 0, selfIdentity.substring(0, selfIdentity.length() - 1), true);
                            else if (selfIdentity.startsWith("W"))
                                isLock = canGetLock(lockChildrens, 1, selfIdentity.substring(0, selfIdentity.length() - 1), true);
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                        }
                        logger.info("收到锁释放监听后,重新尝试获取锁,结果为:" + isLock);
                        if (isLock) {
                            //获得锁
                            logger.info("获得锁,解除因为获取不到锁的阻塞");
                            shardLocklatch.countDown();
                        }
                    }
                }
                else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                    
                }
            }
        });
    }
}
package com.web;

import java.util.Calendar;
import java.util.Date;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.web.service.OrderRepository;
import com.web.service.OrderService;
import com.web.service.ProductRepository;
import com.web.service.pojo.Order;
import com.web.service.pojo.Product;
import com.web.util.LockUtil;

public class Test {
    private static ApplicationContext ctx;  
    
    static 
    {  
        ctx = new ClassPathXmlApplicationContext("applicationContext.xml");  
    }        
    
    /**
     * 测试超卖问题
     * <功能详细描述>
     */
    public static void testpurchase(){
        OrderService os = (OrderService)ctx.getBean("orderService"); 
        
        Order order = new Order();
        order.setProductId(1L);
        order.setCreateTime(new Date());
        order.setPnum(1);
        os.doOrder(order);
    }
    
    public static void testMapper(){
        ProductRepository mapper = (ProductRepository)ctx.getBean("productMapper"); 
        //测试id=1的用户查询,根据数据库中的情况,可以改成你自己的.
        System.out.println("得到用户id=1的用户信息");
        Product product = mapper.selectProductById(1L);
        System.out.println(product.getName()); 
        
        OrderRepository omapper = (OrderRepository)ctx.getBean("orderMapper"); 
        Order order = new Order();
        order.setProductId(1L);
        order.setCreateTime(new Date());
        order.setPnum(1);
        omapper.saveOrder(order);       
    }
    
    public static void testShardLog(int type,String identity){
        System.out.println("---------------开始获取锁"+identity);
        LockUtil.init("localhost:2181");
        LockUtil.getShardLock(type, identity);
        System.out.println("---------------获取锁结束"+identity);
    }
    public static void main(String[] args)  
    {  

        int type = Integer.parseInt(args[0]);//0读锁还是1写锁
        System.out.println("type="+type);//线程名称
        testShardLog(type,args[1]);
        //testShardLog(0,"f6");
//        try {
//            LockUtil.init("localhost:2181");
//            //LockUtil.addChildWatcher("/LockService");
//            LockUtil.getExclusiveLock();
//        } catch (Exception e1) {
//            // TODO Auto-generated catch block
//            e1.printStackTrace();
//        }
        
        //排他锁***************************超卖测试***************************
        
//        long nowtime = Calendar.getInstance().getTimeInMillis();
//        System.out.println("begin with "+nowtime);
//        testpurchase();
//        System.out.println("end with "+nowtime);
        
         //end排他锁***************************超卖测试***************************
        
        
    //    String []names = ctx.getBeanDefinitionNames();
//        for(String s: names){
//            System.out.println(s);
//        }
        
        try {
            Thread.sleep(20000);
            System.out.println("-------------开始--释放锁");
            LockUtil.unlockForShardLock();
            System.out.println("-------------成功--释放锁");
            Thread.sleep(30000);
            System.out.println("---------------结束退出");
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    } 
}

 

以上是关于Zookeeper实现分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

Zookeeper分布式锁

zooKeeper实现分布式锁

基于zookeeper的分布式锁实现

zookeeper怎么实现分布式锁

基于Zookeeper实现的分布式互斥锁 - InterProcessMutex

ZooKeeper实现读写锁