分布式锁

Posted yhongyin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式锁相关的知识,希望对你有一定的参考价值。

分布式锁

为了保证一个方法或属性在高并发情况下的同一时间只能被一个线程访问,在单机部署的情况下,可以使用ReentrantLock或Synchronized进行互斥控制。随着发展,单机部署的系统已经不能满足业务的需要,越来越多的系统进化成分布式集群系统,原本在单机运行的锁控制已经不能实现“一个方法或属性在高并发情况下的同一时间只能被一个线程访问”的控制,要在分布式系统中实现在单机中的控制效果就必须使用分布式锁。

Redis实现方式

加入依赖

        <!--redisson分布式锁-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.6.5</version>
        </dependency>

配置Redisson bean

    @Bean
    public Redisson redisson(){
        //单机模式
        Config config = new Config();
//        config.useClusterServers().addNodeAddress("","","");//集群模式
        config.useSingleServer().setAddress("redis://192.168.86.126:6379");
        return (Redisson) Redisson.create(config);
    }

实现

        //业务代码加redis分布式锁
        String lockKey = "redis-lock-key";
        RLock rlock = redisson.getLock(lockKey);

        //加锁并设置超时时间,自动续幂
        rlock.lock(30, TimeUnit.SECONDS);
        try {
            //加锁业务代码
            ……

        } finally {
            //解锁
            rlock.unlock();
        }

Zookeeper实现

加入依赖

        <!--zookeeper的jar依赖包-->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.11</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

Lock接口

public interface Lock {

    void lock() throws Exception;

    void unlock() throws Exception;
}

写一个抽象类AbstractZookeeperLock实现加锁和解锁

public abstract class AbstractZookeeperLock implements Lock {

    protected String lock;

    protected String zkAddress = "localhost:2181";

    protected ZkClient zkClient = new ZkClient(zkAddress);

    @Override
    public final void lock() throws Exception {

        //尝试获取锁
        if (tryLock()) {
            //拿到锁
            System.out.println("获取锁成功...");
        } else {
            //尝试获取锁未成功,等待获取锁,阻塞
            // 如果此处已经不阻塞了,那么可以继续执行下面的代码
            waitLock();
            //阻塞结束,继续获取锁
            lock();
        }
    }


    @Override
    public final void unlock() throws Exception {
        //临时节点:临时存储,当客户端的链接与zookeeper断开后,临时节点自动删除
        //关闭链接就解锁了
        if (zkClient != null) {
//            zkClient.delete("/path/xxx");
            zkClient.close();
            System.out.println("解锁成功...");
        }
        //持久化节点:永久在zookeeper上
    }

    protected abstract boolean tryLock();

    protected abstract void waitLock();
}

具体使用类实现尝试获取锁和等待获取锁

public class ZookeeperDistributedLock extends AbstractZookeeperLock {

    private CountDownLatch countDownLatch;

    public ZookeeperDistributedLock(String lockName) {
        lock = lockName;
    }

    //尝试获取锁
    @Override
    protected boolean tryLock() {

        try {
            //创建一个临时节点
            zkClient.createEphemeral(lock);
            //获取锁成功
            return true;
        } catch (Exception e) {
            //获取锁失败
            return false;
        }
    }

    //等待获取锁
    @Override
    protected void waitLock() {
        //如果已经有线程创建了临时节点,那么其他线程只能等待,不能再创建该临时节点
        //那么就监听你这个临时节点,如果该节点被删除了,那我等待结束,就又可以创建临时节点

        //订阅数据改变,就是监听参数中指定的那个节点
        IZkDataListener listener = new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {

            }

            @Override
            public void handleDataDeleted(String s) throws Exception {
                if (countDownLatch != null) {
                    //计数减1
                    countDownLatch.countDown();
                }
            }
        };
        //1、监听
        zkClient.subscribeDataChanges(lock, listener);

//        2、判断那个锁的节点是否存在
        if (zkClient.exists(lock)) {
            countDownLatch = new CountDownLatch(1);
            try {
                //等待
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
//        3、订阅要取消一下
        zkClient.unsubscribeDataChanges(lock, listener);
    }
}

具体使用

        //业务代码加zookeeper分布式锁
        ZookeeperDistributedLock lock = new ZookeeperDistributedLock("/lock_nodeName");
        try {
            lock.lock();
           //加锁业务代码
            ……
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

以上是关于分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

分布式锁三种解决方案

MySQL系列:kafka停止命令

硬核!管理mysql数据库的工具

java开发的项目案例,大厂内部资料

JUC并发编程 共享模式之工具 JUC CountdownLatch(倒计时锁) -- CountdownLatch应用(等待多个线程准备完毕( 可以覆盖上次的打印内)等待多个远程调用结束)(代码片段

Java进阶之光!2021必看-Java高级面试题总结