分布式锁实现

Posted tarolord

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式锁实现相关的知识,希望对你有一定的参考价值。

为什么要用分布式锁?

   分析下此场景:

  • 某个小型电商网站,有个生成唯一订单号的服务。

技术图片

 

  • 如果同时多个用户下单,如何保障订单号的唯一性?

技术图片

是的,加个锁可以解决。示范代码如下:

public class OrderServiceImplWithLock implements OrderService {

    private OrderCodeGenerator ocg = new OrderCodeGenerator();

    private Lock lock = new ReentrantLock();

    // 创建订单接口
    @Override
    public void createOrder() {

        String orderCode = null;
        try {
            lock.lock();
            // 获取订单号
            orderCode = ocg.getOrderCode();
        } finally {
            lock.unlock();
        }
        // ……业务代码,此处省略100行代码
    }
}

 

  • 随着业务增长,用户量增加,单台服务器已经无法满足了~~那么只能集群解决了!做了集群后,订单生成也改进为共享服务:

技术图片

在各tomcat中使用了各自的锁(jvm的锁),但不能保障订单号唯一。

  • 此时就需要用到分布式锁(用途:在分布式环境下协同共享资源的使用)

技术图片

 

分布式锁有哪些实现方式?

  • 分布式锁大致有如下3种常用实现方式:

   技术图片

  •  基于zookeeper实现,注意:zookeeper有个特效是同父级下的节点不重名。

  技术图片

 

使用的临时节点,其特性是:当客户端与zookeeper断开连接后,临时节点会自动删除(那么锁也就释放了)。示范代码实现如下:

public class ZKDistributeLock implements Lock {

    private String lockPath;

    private ZkClient client;

    public ZKDistributeLock(String lockPath) {
        super();
        this.lockPath = lockPath;
        client = new ZkClient("localhost:2181");
        client.setZkSerializer(new MyZkSerializer());
    }

    @Override
    public boolean tryLock() { 
     // 不会阻塞 // 创建节点 try { client.createEphemeral(lockPath); } catch (ZkNodeExistsException e) { return false; } return true; } @Override public void unlock() { client.delete(lockPath); }
@Override
public void lock() { // 如果获取不到锁,阻塞等待 if (!tryLock()) { // 没获得锁,阻塞自己 waitForLock(); // 再次尝试 lock(); } } private void waitForLock() { CountDownLatch cdl = new CountDownLatch(1); IZkDataListener listener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("----收到节点被删除了-------------"); cdl.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { } }; client.subscribeDataChanges(lockPath, listener); // 阻塞自己 if (this.client.exists(lockPath)) { try { cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 取消注册 client.unsubscribeDataChanges(lockPath, listener); } //省略部分不重要代码 }

此时出现一个问题:zookeeper的watch机制会实时通知用户,那么假如有1000个用户同事竞争锁,只有1个用户获得了锁,业务处理完后释放了锁,剩余的999个用户会同时收到watch通知...从而引起一个惊群效应!!!

  • 惊群效应在较大规模的环境中带来如下危害:

    技术图片

  • 那么采用临时顺序节点:

     技术图片

使用零时顺序节点,示例代码如下:

public class ZKDistributeImproveLock implements Lock {

    /*
     * 利用临时顺序节点来实现分布式锁
     * 获取锁:取排队号(创建自己的临时顺序节点),然后判断自己是否是最小号,如是,则获得锁;不是,则注册前一节点的watcher,阻塞等待
     * 释放锁:删除自己创建的临时顺序节点
     */
    private String LockPath;

    private ZkClient client;

    private String currentPath;

    private String beforePath;

    public ZKDistributeImproveLock(String lockPath) {
        super();
        LockPath = lockPath;
        client = new ZkClient("localhost:2181");
        client.setZkSerializer(new MyZkSerializer());
        if (!this.client.exists(LockPath)) {
            try {
                this.client.createPersistent(LockPath);
            } catch (ZkNodeExistsException e) {

            }
        }
    }

    @Override
    public boolean tryLock() {
        if (this.currentPath == null) {
            currentPath = this.client.createEphemeralSequential(LockPath + "/", "aaa");
        }
        // 获得所有的子
        List<String> children = this.client.getChildren(LockPath);

        // 排序list
        Collections.sort(children);

        // 判断当前节点是否是最小的
        if (currentPath.equals(LockPath + "/" + children.get(0))) {
            return true;
        } else {
            // 取到前一个
            // 得到字节的索引号
            int curIndex = children.indexOf(currentPath.substring(LockPath.length() + 1));
            beforePath = LockPath + "/" + children.get(curIndex - 1);
        }
        return false;
    }

    @Override
    public void lock() {
        if (!tryLock()) {
            // 阻塞等待
            waitForLock();
            // 再次尝试加锁
            lock();
        }
    }

    private void waitForLock() {

        CountDownLatch cdl = new CountDownLatch(1);

        // 注册watcher
        IZkDataListener listener = new IZkDataListener() {

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("-----监听到节点被删除");
                cdl.countDown();
            }

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };

        client.subscribeDataChanges(this.beforePath, listener);

        // 怎么让自己阻塞
        if (this.client.exists(this.beforePath)) {
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 醒来后,取消watcher
        client.unsubscribeDataChanges(this.beforePath, listener);
    }

    @Override
    public void unlock() {
        // 删除节点
        this.client.delete(this.currentPath);
    }
    
        //省略不重要代码  
}

 

  • 基于Redis实现 

     技术图片

 

示例代码如下:

public class RedisLockImpl implements Lock {
    StringRedisTemplate stringRedisTemplate; // redis工具类
    String resourceName = null;
    int timeout = 10;

    ThreadLocal<String> lockRandomValue = new ThreadLocal<>();

    /**
     * 构建一把锁
     *
     * @param resourceName 资源唯一标识
     * @param timeout      资源锁定超时时间~防止资源死锁,单位秒
     */
    public RedisLockImpl(String resourceName, int timeout, StringRedisTemplate stringRedisTemplate) {
        this.resourceName = "lock_" + resourceName;
        this.timeout = timeout;
        this.stringRedisTemplate = stringRedisTemplate;
    }


    @Override
    public boolean tryLock() {
        Boolean lockResult = stringRedisTemplate.execute(new RedisCallback<Boolean>() {
            @Override
            public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                // 随机设置一个值
                lockRandomValue.set(UUID.randomUUID().toString());
                Boolean status = connection.set(resourceName.getBytes(), lockRandomValue.get().getBytes(),
                        Expiration.seconds(timeout), RedisStringCommands.SetOption.SET_IF_ABSENT);
                return status;
            }
        });
        return lockResult;
    }

    Lock lock = new ReentrantLock();

    // 多台机器的情况下,会出现大量的等待,加重redis的压力。 在lock方法上,加入同步关键字。单机同步,多机用redis
    @Override
    public void lock() {
        lock.lock();
        try {
            while (!tryLock()) {
                // 监听,锁删除的通知
                stringRedisTemplate.execute(new RedisCallback<Boolean>() {
                    @Override
                    public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                        try {
                            CountDownLatch waiter = new CountDownLatch(1);
                            // 等待通知结果
                            connection.subscribe((message, pattern) -> {
                                // 收到通知,不管结果,立刻再次抢锁
                                waiter.countDown();
                            }, (resourceName + "_unlock_channel").getBytes());
                            // 等待一段时间,超过这个时间都没收到消息,肯定有问题
                            waiter.await(timeout, TimeUnit.SECONDS);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return true; //随便返回一个值都没问题
                    }
                });
            }
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void unlock() {
        // 释放锁。(底层框架开发者要防止被API的调用者误调用)
        // 错误示范 stringRedisTemplate.delete(resourceName);
        // 1、 要比对内部的值,同一个线程,才能够去释放锁。 2、 同时发出通知
        if (lockRandomValue.get().equals(stringRedisTemplate.opsForValue().get(resourceName))) {
            stringRedisTemplate.delete(resourceName); // 删除资源
            stringRedisTemplate.execute(new RedisCallback<Long>() {
                @Override
                public Long doInRedis(RedisConnection connection) throws DataAccessException {
                    // 发送通知
                    Long received = connection.publish((resourceName + "_unlock_channel").getBytes(), "".getBytes());
                    return received;
                }
            });
        }
    }

     //省略不重要代码   
}

 

总的来说:

技术图片

 

 

 

 

以上是关于分布式锁实现的主要内容,如果未能解决你的问题,请参考以下文章

分布式锁三种解决方案

编程实践分布式锁的实现代码

分布式锁,及Redis实现分布式锁

基于zookeeper实现分布式锁

分布式锁的作用及实现

基于zookeeper实现分布式锁