RedisRedis 如何实现分布式锁
Posted 没对象的指针
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RedisRedis 如何实现分布式锁相关的知识,希望对你有一定的参考价值。
Redis 如何实现分布式锁
1. 什么是分布式锁
分布式锁其实就是,控制分布式系统不同进程共同访问共享资源
的一种锁的实现。如果不同的系统或同一个系统的不同主机之间共享了某个临界资源,往往需要互斥来防止彼此干扰,以保证一致性
。
1.1 分布式锁的特点
-
互斥性:任意时刻,只有一个客户端能持有锁;
-
可重入性:一个线程获取锁之后,可以再次对其请求加锁;
-
锁超时释放:持有锁超时释放,防止不必要的资源浪费,也可以防止死锁;
-
高效、高可用:加锁和解锁需要高效,同时也需要保证高可用防止分布式锁失效;
-
安全性:锁只能被持有的客户端删除,不能被其他客户端删除。
1.2 分布式锁的场景
-
使用分布式锁的场景一般需要满足以下场景:
- 系统是一个
分布式系统
,Java 的锁已经锁不住了; - 操作
共享资源
,比如库里唯一的用户数据; 同步访问
,即多个进程
同时操作共享资源。
- 系统是一个
-
分布式锁的业务场景
- 扣减库存
- 抢红包
- …
1.3 分布式锁的实现方式
-
数据库乐观锁;
-
基于 ZooKeeper 的分布式锁;
-
基于 Redis 的分布式锁。
这里主要介绍使用 Redis 实现分布式锁的方案。
2. Redis 实现分布式锁
2.1 setnx + expire
SETNX
是 SET IF NOT EXISTS
的简写。命令格式是 SETNX key value,如果 lockKey 不存在,则 SETNX 成功返回 1,如果这个 lockKey 已经存在了,则返回 0。
伪代码:
// 1. 加锁
if(jedis.setnx(lockKey, lockValue) == 1)
// 2. 设置过期时间
jedis.expire(lockKey, expireTime);
try
// 3. 业务处理
do something;
catch (Exception e)
log.error("处理失败,", e);
finally
// 4. 释放锁
jedis.del(lockKey);
在这个方案中 setnx
与 expire
「不是原子操作」,如果执行完第一步 jedis.setnx()
加锁后异常了,第二步 jedis.expire()
未执行,相当于这个锁没有过期时间,「有产生死锁的可能」。正对这个问题如何改进?
2.2 set ex px nx
基于 Redis 的 SET 扩展命令(SET key value[EX seconds][PX milliseconds][NX|XX]
),保证 SETNX + EXPIRE
两条指令的原子性。
-
EX second :设置键的过期时间为 second 秒;
-
PX millisecond :设置键的过期时间为 millisecond 毫秒;
-
NX :表示 key 不存在的时候,才能 set 成功,也即保证只有第一个客户端请求才能获得锁,而其他客户端请求只能等其释放锁,才能获取;
-
XX :只在键已经存在时,才对键进行设置操作。
伪代码:
// 1. 加锁并设置过期时间
if(jedis.set(lockKey, lockValue, "NX", "EX", 100s) == 1)
try
// 2. 业务处理
do something;
catch (Exception e)
log.error("处理失败,", e);
finally
// 3. 释放锁
jedis.del(lockKey);
在这个方案中存在两个问题:
-
锁过期释放了,业务还没执行完。假设线程a获取锁成功,一直在执行临界区的代码。但是100s过去后,它还没执行完。但是,这时候锁已经过期了,此时线程b又请求过来。显然线程b就可以获得锁成功,也开始执行临界区的代码。那么问题就来了,临界区的业务代码都不是严格串行执行的了。
-
锁被别的线程误删。假设线程a执行完后,去释放锁。但是它不知道当前的锁可能是线程b持有的(线程a去释放锁时,有可能过期时间已经到了,此时线程b进来占有了锁)。那线程a就把线程b的锁释放掉了,但是线程b临界区业务代码可能都还没执行完。
2.3 set ex px nx + 校验唯一随机值,再删除
既然锁可能被别的线程误删,那我们给 value 值设置一个标记当前线程唯一的随机数,在删除的时候,校验一下,就可以了。
伪代码:
// 1. 加锁并设置过期时间
if(jedis.set(lockKey, uni_lockValue, "NX", "EX", 100s) == 1)
try
// 2. 业务处理
do something;
catch (Exception e)
log.error("处理失败,", e);
finally
// 3. 判断是不是当前线程加的锁
if (uni_lockValue.equals(jedis.get(lockKey)))
// 4. 释放锁
jedis.del(lockKey);
这里的 3. 是非原子性的,我们使用 lua 脚本来优化一下
if redis.call('get',KEYS[1]) == ARGV[1] then
return redis.call('del',KEYS[1])
else
return 0
end;
这个方案还是会存在**「锁过期释放,业务没执行完」**的问题,有些小伙伴认为,稍微把锁过期时间设置长一些就可以了。其实我们设想一下,是否可以给获得锁的线程,开启一个定时守护线程
,每隔一段时间检查锁是否还存在,存在则对锁的过期时间延长,防止锁过期提前释放。
2.4 Redisson 实现分布式锁
只要线程一加锁成功,就会启动一个 watch dog 看门狗,它是一个后台线程,会每隔 10s 检查一下,如果 线程1 还持有锁,那么就会不断的延长锁 key 的生存时间。Redisson 完美解决了「锁过期释放,业务没执行完」问题。
package com.pointer.mall.common.util;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* @author gaoyang
* @date 2023-02-23 20:24
*/
@Slf4j
@Component
public class RedissonUtil
@Resource
private RedissonClient redissonClient;
/**
* 加锁
*
* @param lockKey
*/
public void lock(String lockKey)
RLock lock = redissonClient.getLock(lockKey);
lock.lock();
/**
* 带过期时间的锁
*
* @param lockKey key
* @param leaseTime 上锁后自动释放锁时间
*/
public void lock(String lockKey, long leaseTime)
RLock lock = redissonClient.getLock(lockKey);
lock.lock(leaseTime, TimeUnit.SECONDS);
/**
* 带超时时间的锁
*
* @param lockKey key
* @param leaseTime 上锁后自动释放锁时间
* @param unit 时间单位
*/
public void lock(String lockKey, long leaseTime, TimeUnit unit)
RLock lock = redissonClient.getLock(lockKey);
lock.lock(leaseTime, unit);
/**
* 尝试获取锁
*
* @param lockKey key
* @return
*/
public boolean tryLock(String lockKey)
RLock lock = redissonClient.getLock(lockKey);
return lock.tryLock();
/**
* 尝试获取锁
*
* @param lockKey key
* @param waitTime 最多等待时间
* @param leaseTime 上锁后自动释放锁时间
* @return boolean
*/
public boolean tryLock(String lockKey, long waitTime, long leaseTime)
RLock lock = redissonClient.getLock(lockKey);
try
return lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);
catch (InterruptedException e)
log.error("RedissonUtils - tryLock异常", e);
return false;
/**
* 尝试获取锁
*
* @param lockKey key
* @param waitTime 最多等待时间
* @param leaseTime 上锁后自动释放锁时间
* @param unit 时间单位
* @return boolean
*/
public boolean tryLock(String lockKey, long waitTime, long leaseTime, TimeUnit unit)
RLock lock = redissonClient.getLock(lockKey);
try
return lock.tryLock(waitTime, leaseTime, unit);
catch (InterruptedException e)
log.error("RedissonUtils - tryLock异常", e);
return false;
/**
* 释放锁
*
* @param lockKey key
*/
public void unlock(String lockKey)
RLock lock = redissonClient.getLock(lockKey);
lock.unlock();
/**
* 是否存在锁
*
* @param lockKey key
* @return
*/
public boolean isLocked(String lockKey)
RLock lock = redissonClient.getLock(lockKey);
return lock.isLocked();
以上是关于RedisRedis 如何实现分布式锁的主要内容,如果未能解决你的问题,请参考以下文章