手撕三种分布式锁

Posted MALE_2

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手撕三种分布式锁相关的知识,希望对你有一定的参考价值。

代码对应github地址

为什么需要分布式锁

锁的作用就是多个线程或者进程对同一份资源进行修改时,保证资源是被正确地修改,例如多个线程同时对一个数字加一,由于读取、修改、赋值,不一定是一个原子操作,需要锁来保证这一个过程的原子性

分布式锁就是保证多个应用或进程对同一份资源进行操作时,结果的正确性

分布式锁常见解决方案

分布式锁需要具备的几个条件:

  • 互斥(必须):同一时刻,分布式部署的应用中,同一个方法/资源只能被一台机器上的一个线程占用
  • 锁失效保护(必须):出现客户端断电等异常情况,锁仍然能被其它客户端获取,防止死锁
  • 可重入(可选):同一个线程在没有释放锁之前,如果想再次操作,可以直接获得锁
  • 阻塞/非阻塞(可选):若没有获取到锁,返回获取失败
  • 高可用、高性能(可选):获取释放锁最好是原子操作,获取释放锁的性能要好

数据库实现分布式锁

CREATE TABLE `testlock` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `method_name` varchar(100) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
  `value` varchar(1024) NOT NULL DEFAULT '锁信息',
  `expire` timestamp(6) NULL DEFAULT NULL COMMENT '超时机制',
  `request_info` varchar(255) DEFAULT NULL COMMENT '持有信息',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uidx_method_name` (`method_name`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';

加锁

insert into testlock(method_name, value, expire, request_info) values ('m1', '1', timeout, machine+thread);

释放锁

delete from myLock where method_name ='m1';

锁失效

应用起一个线程,轮询删除表中超时的数据

锁重入

若果要实现锁重入,则将value初始化为0,用悲观锁或者乐观锁锁住,然后获取锁时,判断value是否为0/当前持有信息是否为当前线程,是的话则获取锁成功,value = value+1,释放锁时同样value = value - 1,当value = 0 时,置空request_info;

BEGIN;
select * from testlock where method='';
if(value==0||request_info==current_thread){
    //success
    update testlock set value = value+1,request_info=current_thread;
}
COMMIT;

争夺锁
代码层面实现重试

单点问题
配置主从数据库

方案
优点:跟redis相比,好像优点不大
缺点:性能问题

Redis实现分布式锁

加锁

set key value PX milliseconds NX

key、value:键值对

PX milliseconds:设置键的过期时间为 milliseconds 毫秒

NX:只在键不存在时,才对键进行设置操作。SET key value NX 效果等同于 SETNX key value

PX、expireTime 参数则是用于解决没有解锁导致的死锁问题。因为如果没有过期时间,万一程序员写的代码有 bug 导致没有解锁操作,则就出现了死锁,因此该参数起到了一个“兜底”的作用。

NX 参数用于保证在多个线程并发 set 下,只会有1个线程成功,起到了锁的“唯一”性。

解锁

  1. 查询当前“锁”是否还是我们持有,因为存在过期时间,所以可能等你想解锁的时候,“锁”已经到期,然后被其他线程获取了,所以我们在解锁前需要先判断自己是否还持有“锁”
  2. 如果“锁”还是我们持有,则执行解锁操作,也就是删除该键值对,并返回成功;否则,直接返回失败。

由于当前 Redis 还没有原子命令直接支持这两步操作,所以当前通常是使用 Lua 脚本来执行解锁操作,Redis 会保证脚本里的内容执行是一个原子操作

脚本代码如下,逻辑比较简单:

if redis.call("get",KEYS[1]) == ARGV[1]
then
    return redis.call("del",KEYS[1])
else
    return 0
end

两个参数的意义如下:

KEYS[1]:我们要解锁的 key

ARGV[1]:我们加锁时的 value,用于判断当“锁”是否还是我们持有,如果被其他线程持有了,value 就会发生变化

代码实现

首先模拟应用场景

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class RedisLockTest {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    int count = 0;
    String methodName = "addOne";

    @Test
    public void test() throws InterruptedException {
        RedisLock redisLock = new RedisLock(redisTemplate);
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(100, 200, 20, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
        for (int i = 0; i < 500; i++) {
            CompletableFuture.runAsync(() -> addOne(), threadPoolExecutor);
            CompletableFuture.runAsync(() -> addOne(), threadPoolExecutor);
            CompletableFuture.runAsync(() -> addOne(), threadPoolExecutor);
        }
        // 等待50s,等剩余任务跑完
        Thread.sleep(50000);
        System.out.println(count);
    }

    public void addOne() {
        count++;
    }
}

output:1982
以上这段代码理论上count最后应该为1500,但是运行多次都少于1500,这就是我们常见的应用场景,数据不一致的问题

接下来,我们编写分布式锁

public class RedisLock {
    /* redis操作类 */
    private RedisTemplate<String, String> redisTemplate;
    // 获取分布式锁
    public boolean lock(String methodName, String lockId, long timeout) {
        if (timeout <= 1) throw new RuntimeException("过期时间应大于1毫秒");
        while (true) {
            // 没有获取到锁就自旋,应该还有挂起通知的方式,暂且不研究
            boolean lock = tryLock(methodName, lockId, timeout);
            if (lock) {
                break;
            }
            try {
                // 线程暂停50毫秒,避免请求redis太频繁
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return true;
}
    // 实际获取锁的过程
    public boolean tryLock(String methodName, String lockId, long timeout) {
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(methodName, lockId, timeout, TimeUnit.MILLISECONDS);
        if (flag == null)
            return false;
        return flag;
    }
    // 解除锁的脚本
    public static final String UP_LOCK_SCRIPT = "if redis.call(\\"get\\",KEYS[1]) == ARGV[1]\\n" +
            "then\\n" +
            "    return redis.call(\\"del\\",KEYS[1])\\n" +
            "else\\n" +
            "    return 0\\n" +
            "end\\n";
    
    // 解除分布式锁
    public boolean unlock(String methodName, String lockId) {
        DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(UN_LOCK_SCRIPT, Boolean.class);
        Boolean result = redisTemplate.execute(redisScript, Collections.singletonList(methodName), lockId);
        if (result == null) {
            return false;
        }
        return result;
}
}

我们修改一下addOne方法

public void addOne(RedisLock redisLock) {
    long id = Thread.currentThread().getId();
    redisLock.lock(methodName, String.valueOf(id), 2000);
    count++;
    redisLock.unlock(methodName, String.valueOf(id));
}

在使用redis分布式锁的情况下,最终结果总是等于1500,注意,主线程一定得等其它线程跑完才停止,时间设置的长一点

过期未处理完怎么解决

为了防止死锁,我们会给分布式锁加一个过期时间,但是万一这个时间到了,我们业务逻辑还没处理完,怎么办?

首先,我们在设置过期时间时要结合业务场景去考虑,尽量设置一个比较合理的值,就是理论上正常处理的话,在这个过期时间内是一定能处理完毕的。

之后,我们再来考虑对这个问题进行兜底设计。

关于这个问题,目前常见的解决方法有两种:

  1. 守护线程“续命”:额外起一个线程,定期检查线程是否还持有锁,如果有则延长过期时间。Redisson
    里面就实现了这个方案,使用“看门狗”定期检查(每1/3的锁时间检查1次),如果线程还持有锁,则刷新过期时间。

  2. 超时回滚:当我们解锁时发现锁已经被其他线程获取了,说明此时我们执行的操作已经是“不安全”的了,此时需要进行回滚,并返回失败

同时,需要进行告警,人为介入验证数据的正确性,然后找出超时原因,是否需要对超时时间进行优化等等

我们来实现一个自动续期,每 1/2个过期时间检查一次,重新将过期时间设置为一个原过期时间单位的看门狗

我们对redislock增加三个属性,用延时队列来操作延时任务,更多的延时任务方法请看这片文章https://blog.csdn.net/echizao1839/article/details/105533214

/* 最大延迟次数 需要配置可自己修改代码 */
public final static long MAX_TIMES = 5;
/* 看门狗守护线程 */
private Thread daemonTread;
/* 延时队列 */
DelayQueue<DelayTask> queue = new DelayQueue<>();

public RedisLock(RedisTemplate<String, String> redisTemplate) {
    this.redisTemplate = redisTemplate;
    daemonTread = new Thread(() -> watchDog(queue), "delay-watchdog");
    daemonTread.setDaemon(true);
    daemonTread.start();
}

编写一个延时任务类

@Getter
public static class DelayTask implements Delayed {
    // 方法名称
    private String methodName;
    // 线程名称
    private String lockId;
    // 单位为毫秒
    private long expire;
    // 单位为毫秒
    private long exeTime;
    // 当前重试次数
    private long times;

    public DelayTask(String methodName, String lockId, long expire, long times) {
        this(methodName, lockId, expire);
        this.times = times;
    }

    public DelayTask(String methodName, String lockId, long expire) {
        this.methodName = methodName;
        this.lockId = lockId;
        this.expire = expire;
        this.exeTime = System.currentTimeMillis() + expire;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return exeTime - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        DelayTask t = (DelayTask) o;
        if (this.exeTime - t.exeTime <= 0) {
            return -1;
        } else {
            return 1;
        }
    }
}

要想延长redis key的时间,编写操作redis的脚本

// 延长锁时间lua脚本
public static final String EXTEND_EXPIRE_TIME_SCRIPT = "if redis.call(\\"get\\",KEYS[1]) == ARGV[1]\\n" +
        "then\\n" +
        "    return redis.call(\\"pexpire\\",KEYS[1],ARGV[2])\\n" +
        "else\\n" +
        "    return 0\\n" +
        "end\\n";

// 延长锁的时间
public boolean extendExpire(String methodName, String lockId, long expire) {
    DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(EXTEND_EXPIRE_TIME_SCRIPT, Boolean.class);
    Boolean result = this.redisTemplate.execute(redisScript, Collections.singletonList(methodName), lockId, String.valueOf(expire));
    if (result == null) {
        return false;
    }
    return result;
}

实现看门狗的逻辑

public void watchDog(DelayQueue<DelayTask> queue) {
    while (true) {
        try {
            DelayTask delayTask = queue.take();
            String methodName = delayTask.getMethodName();
            String threadId = delayTask.getLockId();
            long times = delayTask.getTimes();
            long expire = delayTask.getExpire();
            if (times < MAX_TIMES) {
                boolean success = extendExpire(methodName, threadId, expire);
                if (success) {
                    // 延时成功,重新加入延时队列
                    queue.add(new DelayTask(methodName, threadId, (expire + 1) / 2, times + 1));
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

守护线程续命的方案有什么漏洞

Redisson 使用看门狗(守护线程)“续命”的方案在大多数场景下是挺不错的,也被广泛应用于生产环境,但是在极端情况下还是会存在问题

问题例子如下:

  1. 线程1首先获取锁成功,将键值对写入 redis 的 master 节点
  2. 在 redis 将该键值对同步到 slave 节点之前,master 发生了故障
  3. redis 触发故障转移,其中一个 slave 升级为新的 master
  4. 此时新的 master 并不包含线程1写入的键值对,因此线程2尝试获取锁也可以成功拿到锁
  5. 此时相当于有两个线程获取到了锁,可能会导致各种预期之外的情况发生,例如最常见的脏数据

解决方法:上述问题的根本原因主要是由于 redis 异步复制带来的数据不一致问题导致的,因此解决的方向就是保证数据的一致

当前比较主流的解法和思路有两种:
1)Redis 作者提出的 RedLock;2)Zookeeper 实现的分布式锁

RedLock的优缺点

首先,该方案也是基于文章开头的那个方案(set加锁、lua脚本解锁)进行改良的,所以 antirez 只描述了差异的地方,大致方案如下。

假设我们有 N 个 Redis 主节点,例如 N = 5,这些节点是完全独立的,我们不使用复制或任何其他隐式协调系统,为了取到锁,客户端应该执行以下操作:

  1. 获取当前时间,以毫秒为单位
  2. 依次尝试从5个实例,使用相同的 key 和随机值(例如UUID)获取锁。当向Redis 请求获取锁时,客户端应该设置一个超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在 5-50 毫秒之间。这样可以防止客户端在试图与一个宕机的 Redis 节点对话时长时间处于阻塞状态。如果一个实例不可用,客户端应该尽快尝试去另外一个Redis实例请求获取锁
  3. 客户端通过当前时间减去步骤1记录的时间来计算获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且获取锁使用的时间小于锁失效时间时,锁才算获取成功
  4. 如果取到了锁,其有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)
  5. 如果由于某些原因未能获得锁(无法在至少N/2+1个Redis实例获取锁、或获取锁的时间超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)

可以看出,该方案为了解决数据不一致的问题,直接舍弃了异步复制,只使用 master 节点,同时由于舍弃了 slave,为了保证可用性,引入了 N 个节点,官方建议是 5。

该方案看着挺美好的,但是实际上我所了解到的在实际生产上应用的不多,主要有两个原因:
1)该方案的成本似乎有点高,需要使用5个实例;
2)该方案一样存在问题。

该方案主要存以下问题:

1)严重依赖系统时钟。如果线程1从3个实例获取到了锁,但是这3个实例中的某个实例的系统时间走的稍微快一点,则它持有的锁会提前过期被释放,当他释放后,此时又有3个实例是空闲的,则线程2也可以获取到锁,则可能出现两个线程同时持有锁了。

2)如果线程1从3个实例获取到了锁,但是万一其中有1台重启了,则此时又有3个实例是空闲的,则线程2也可以获取到锁,此时又出现两个线程同时持有锁了

针对以上问题其实后续也有人给出一些相应的解法,但是整体上来看还是不够完美,所以目前实际应用得不是那么多

Zookeeper实现分布式锁

Zookeeper 的分布式锁实现方案如下:

  1. 创建一个锁目录 /locks,该节点为持久节点
  2. 想要获取锁的线程都在锁目录下创建一个临时顺序节点
  3. 获取锁目录下所有子节点,对子节点按节点自增序号从小到大排序
  4. 判断本节点是不是第一个子节点,如果是,则成功获取锁,开始执行业务逻辑操作;如果不是,则监听自己的上一个节点的删除事件
  5. 持有锁的线程释放锁,只需删除当前节点即可
  6. 当自己监听的节点被删除时,监听事件触发,则回到第3步重新进行判断,直到获取到锁

由于 Zookeeper 保证了数据的强一致性,因此不会存在之前 Redis 方案中的问题,整体上来看还是比较不错的

Zookeeper 方案的主要问题在于性能不如 Redis 那么好,当申请锁和释放锁的频率较高时,会对集群造成压力,此时集群的稳定性可用性能可能又会遭受挑战

接下来我们来编写zookeeper分布式锁
首先,自己用docker搭建zookeeper环境,然后了解zookeeper的基本结构和指令
实例化一个zookeeper客户端,我们采用单例模式来实例化

private static volatile ZooKeeper zkCli = null;
/* zookeeper的地址和端口 */
private static final String connectString = "127.0.0.1:2181";
/* 会话超时时间 */
private static final int sessionTimeout = 30000;

/**
 * 初始化
 * @return
 */
public static ZooKeeper getClient() {
    if (zkCli == null) {
        synchronized (ZooKeeper.class) {
            if (zkCli == null) {
                CountDownLatch countDownLatch = new CountDownLatch(1);
                try {
                    zkCli = new ZooKeeper(connectString, sessionTimeout, event -> {
                        if (zkCli.getState().isConnected()) {
                            // 监听连接zookeeper状态
                            countDownLatch.countDown();
                        }
                    });
                    // 因为连接需要时间,代码不能直接往下走
                    countDownLatch.await();
                } catch (Exception e) {
                    System.out.println("create zookeeper client fail!!!!!!!!!!!!!!!!!!!");
                } finally {
                    countDownLatch.countDown();
                }
            }
        }
    }
    return zkCli;
}

实现zookeeper加锁逻辑

@SneakyThrows
public static String lock(String methodName, String lockId) {

    String path = getPath(methodName);
    // 注册一个节点
    String zNode = ZookeeperUtil.getClient().create(path + "/node", lockId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    while (true) {
        // 获取所有节点
        List<String> children = ZookeeperUtil.getClient().getChildren(path, true);
        int index = children.indexOf(zNode.substring(zNode.lastIndexOf("/") + 1));
        // 判断是否第一个
        if (index != 0) {
            //否,注册监听到前一个节点,然后挂起
            CountDownLatch countDownLatch = new CountDownLatch(1);
            Stat exists = ZookeeperUtil.getClient().exists(path + "/" + children.get(index - 1), event -> {
                countDownLatch.countDown();
            });
            if (exists != null) {
                countDownLatch.await();
            }
        } else {
            return zNode;
        }
    }
}

解锁的逻辑

@SneakyThrows
public static void unLock(String zNode) {
    // 因为删除的接口有个版本号,所以先获取节点的信息,再删除
    Stat stat = ZookeeperUtil.getClient().exists(zNode, true);
    ZookeeperUtil.getClient().delete(zNode, stat.getVersion());
}

加入缓存来减少一步zookeeper查询

/* 将方法名和zookeeper节点路径缓存起来,提高性能,可能存在的问题就是缓存下来了,有其他人偷偷删了,典型缓存不一致问题,可以根据实际情况思考 */
private static final ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>();

@SneakyThrows
public static String getPath(String methodName) {

    String path = concurrentHashMap.get(methodName);
    if (path == null) {
        // 通过方法名,判断zookeeper目录节点是否存在,不存在则创建一个持久性节点
        Stat stat = ZookeeperUtil.getClient().exists("/locks/" + methodName, true);
        if (stat == null) {
            ZookeeperUtil.getClient().create("/locks/" + methodName, "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        concurrentHashMap.put(methodName, "/locks/" + methodName);
    }
    return concurrentHashMap.get(methodName);
}

测试代码,具体环境和结果可以参照前面redis的,基本一样

public void addOneWithZookeeperLock() {
    long id = Thread.currentThread().getId();
    String lock = ZookeeperLock.lock(methodName, String.valueOf(id));
    count++;
    System.out.println(count);
    ZookeeperLock.unLock(lock);
}

分布式锁的选择

上面我们对三种分布式锁的实现方案进行了阐述
在日常工作中如何选择合适的方案来使用

  • 如果我们的工作环境中已经引入了redis和zookeeper中间件,那么可以优先考虑这两种再根据实际的业务情况,比如业务的性能和稳定性需求来综合考虑
  • 如果没有使用上面这两种中间件,并且性能要求不高,那么可以直接用数据库来解决
  • 如果性能不够,那么可以用redis来解决
  • 如果对稳定性和安全性要求极高,可以考虑用zookeeper

参考文章

以上是关于手撕三种分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

作为一个码农终于把MySQL日记看懂了,手撕面试官

还不会使用分布式锁?教你三种分布式锁实现的方式

你绕不开的组件—锁,4个方面手撕锁的多种实现

高并发没锁可不行,三种分布式锁详解

Redis实现分布式锁(设计模式应用实战)

Redis实现分布式锁(设计模式应用实战)