详解redission分布式锁配置及使用,防止java服务重复提交和修改动作等问题
Posted 阿啄debugIT
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了详解redission分布式锁配置及使用,防止java服务重复提交和修改动作等问题相关的知识,希望对你有一定的参考价值。
前言
在多线程的环境下,为了保证一个代码块在同一时间只能由一个线程访问,Java中我们一般可以使用synchronized语法和ReetrantLock去保证,这实际上是本地锁的方式。
但是现在公司都是流行分布式架构,在分布式环境下,如何保证不同节点的线程同步执行呢?
因此就引出了分布式锁,它是控制分布式系统之间互斥访问共享资源的一种方式。
在一个分布式系统中,多台机器上部署了多个服务,当客户端一个用户发起一个数据插入请求时,如果没有分布式锁机制保证,那么那多台机器上的多个服务可能进行并发插入操作,导致数据重复插入,对于某些不允许有多余数据的业务来说,这就会造成问题。
而分布式锁机制就是为了解决类似这类问题,保证多个服务之间互斥的访问共享资源,如果一个服务抢占了分布式锁,其他服务没获取到锁,就不进行后续操作。
分布式锁的特点
分布式锁一般有如下的特点:
- 互斥性: 同一时刻只能有一个线程持有锁
- 可重入性: 同一节点上的同一个线程如果获取了锁之后能够再次获取锁
- 锁超时:和J.U.C中的锁一样支持锁超时,防止死锁
- 高性能和高可用: 加锁和解锁需要高效,同时也需要保证高可用,防止分布式锁失效
- 具备阻塞和非阻塞性:能够及时从阻塞状态中被唤醒
分布式锁的实现方式
一般实现分布式锁有以下几种方式:
- 基于数据库
- 基于Redis
- 基于zookeeper
Redis普通分布式锁存在的问题
说道Redis分布式锁大部分人都会想到:setnx+lua,或者知道set key value px milliseconds nx。
后一种方式的核心实现命令如下:
- 获取锁(unique_value可以是UUID等)
SET resource_name unique_value NX PX 30000
- 释放锁(lua脚本中,一定要比较value,防止误解锁)
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
这种实现方式有3大要点(也是面试概率非常高的地方):
- set命令要用
set key value px milliseconds nx
- value要具有唯一性;
- 释放锁时要验证value值,不能误解锁;
事实上这类琐最大的缺点,就是它加锁时,只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点,由于某些原因发生了主从切换,那么就会出现锁丢失的情况:
- 在Redis的master节点上拿到了锁,但是这个加锁的key还没有同步到slave节点;
- master故障,发生故障转移,slave节点升级为master节点,导致锁丢失。
正因为如此,Redis作者antirez,基于分布式环境下,提出了一种更高级的分布式锁的实现方式:Redlock。
Redis高级分布式锁:Redlock
Redlock也是Redis所有分布式锁实现方式中,唯一能让面试官高潮的方式。
Redlock实现,是antirez提出的redlock算法大概是这样的:
- 在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。
- 我们确保将在N个实例上,使用与在Redis单实例下,相同方法获取和释放锁。
- 现在我们假设有5个Redis master节点,同时我们需要在5台服务器上,运行这些Redis实例,这样保证他们不会同时都宕掉。
为了取到锁,客户端应该执行以下操作:
- 获取当前Unix时间,以毫秒为单位。
- 依次尝试从5个实例,使用相同的key和具有唯一性的value(例如UUID)获取锁。
- 当向Redis请求获取锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。
- 例如你的锁,自动失效时间为10秒,则超时时间,应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。
- 如果服务器端没有在规定时间内响应,客户端应该尽快尝试去,另外一个Redis实例请求获取锁。
- 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。
- 当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
- 如果取到了锁,key的真正有效时间,等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
- 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例,取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例,上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁,但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。
Redlock源码
redisson已经有对redlock算法封装,接下来对其用法进行简单介绍,并对核心源码进行分析(假设5个redis实例)。
POM依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.3.2</version>
</dependency>
用法
首先,我们来看一下redission封装的redlock算法实现的分布式锁用法,非常简单,跟重入锁(ReentrantLock)有点类似:
Config config = new Config();
config.useSentinelServers().addSentinelAddress("127.0.0.1:6369","127.0.0.1:6379", "127.0.0.1:6389")
.setMasterName("masterName")
.setPassword("password").setDatabase(0);
RedissonClient redissonClient = Redisson.create(config);
// 还可以getFairLock(), getReadWriteLock()
RLock redLock = redissonClient.getLock("REDLOCK_KEY");
boolean isLock;
try
isLock = redLock.tryLock();
// 500ms拿不到锁, 就认为获取锁失败。10000ms即10s是锁失效时间。
isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
if (isLock)
//TODO if get lock success, do something;
catch (Exception e)
finally
// 无论如何, 最后都要解锁
redLock.unlock();
唯一ID
实现分布式锁的一个非常重要的点,就是set的value要具有唯一性。
redisson的value是怎样保证value的唯一性呢?
答案是UUID+threadId。入口在redissonClient.getLock("REDLOCK_KEY"),源码在Redisson.java和RedissonLock.java中:
protected final UUID id = UUID.randomUUID();
String getLockName(long threadId)
return id + ":" + threadId;
获取锁
获取锁的代码为redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),两者的最终核心源码都是下面这段代码,只不过前者获取锁的默认租约时间(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command)
internalLockLeaseTime = unit.toMillis(leaseTime);
// 获取锁时向5个redis实例发送的命令
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通过pexpire设置失效时间(也是锁的租约时间)
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 获取分布式锁的KEY的失效时间毫秒数
"return redis.call('pttl', KEYS[1]);",
// 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
获取锁的命令中,
- KEYS[1]就是Collections.singletonList(getName()),表示分布式锁的key,即REDLOCK_KEY;
- ARGV[1]就是internalLockLeaseTime,即锁的租约时间,默认30s;
- ARGV[2]就是getLockName(threadId),是获取锁时set的唯一值,即UUID+threadId:
释放锁
释放锁的代码为redLock.unlock(),核心源码如下:
protected RFuture<Boolean> unlockInnerAsync(long threadId)
// 向5个redis实例都执行如下命令
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果分布式锁KEY不存在,那么向channel发布一条消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 如果就是当前线程占有分布式锁,那么将重入次数减1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
// 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
RedisLock实现的分布式锁工具类
利用SpringBoot + Jedis + AOP的组合来实现一个简易的分布式锁。
1. 自定义注解
自定义一个注解,被注解的方法会执行获取分布式锁的逻辑
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RedisLock
/**
* 业务键
*
* @return
*/
String key();
/**
* 锁的过期秒数,默认是5秒
*
* @return
*/
int expire() default 5;
/**
* 尝试加锁,最多等待时间
*
* @return
*/
long waitTime() default Long.MIN_VALUE;
/**
* 锁的超时时间单位
*
* @return
*/
TimeUnit timeUnit() default TimeUnit.SECONDS;
2. AOP拦截器实现
在AOP中我们去执行获取分布式锁和释放分布式锁的逻辑,代码如下:
@Aspect
@Component
public class LockMethodAspect
@Autowired
private RedisLockHelper redisLockHelper;
@Autowired
private JedisUtil jedisUtil;
private Logger logger = LoggerFactory.getLogger(LockMethodAspect.class);
@Around("@annotation(com.redis.lock.annotation.RedisLock)")
public Object around(ProceedingJoinPoint joinPoint)
Jedis jedis = jedisUtil.getJedis();
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
RedisLock redisLock = method.getAnnotation(RedisLock.class);
String value = UUID.randomUUID().toString();
String key = redisLock.key();
try
final boolean islock = redisLockHelper.lock(jedis,key, value, redisLock.expire(), redisLock.timeUnit());
logger.info("isLock : ",islock);
if (!islock)
logger.error("获取锁失败");
throw new RuntimeException("获取锁失败");
try
return joinPoint.proceed();
catch (Throwable throwable)
throw new RuntimeException("系统异常");
finally
logger.info("释放锁");
redisLockHelper.unlock(jedis,key, value);
jedis.close();
3. Redis实现分布式锁核心类
@Component
public class RedisLockHelper
private long sleepTime = 100;
/**
* 直接使用setnx + expire方式获取分布式锁
* 非原子性
*
* @param key
* @param value
* @param timeout
* @return
*/
public boolean lock_setnx(Jedis jedis,String key, String value, int timeout)
Long result = jedis.setnx(key, value);
// result = 1时,设置成功,否则设置失败
if (result == 1L)
return jedis.expire(key, timeout) == 1L;
else
return false;
/**
* 使用Lua脚本,脚本中使用setnex+expire命令进行加锁操作
*
* @param jedis
* @param key
* @param UniqueId
* @param seconds
* @return
*/
public boolean Lock_with_lua(Jedis jedis,String key, String UniqueId, int seconds)
String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" +
"redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
List<String> keys = new ArrayList<>();
List<String> values = new ArrayList<>();
keys.add(key);
values.add(UniqueId);
values.add(String.valueOf(seconds));
Object result = jedis.eval(lua_scripts, keys, values);
//判断是否成功
return result.equals(1L);
/**
* 在Redis的2.6.12及以后中,使用 set key value [NX] [EX] 命令
*
* @param key
* @param value
* @param timeout
* @return
*/
public boolean lock(Jedis jedis,String key, String value, int timeout, TimeUnit timeUnit)
long seconds = timeUnit.toSeconds(timeout);
return "OK".equals(jedis.set(key, value, "NX", "EX", seconds));
/**
* 自定义获取锁的超时时间
*
* @param jedis
* @param key
* @param value
* @param timeout
* @param waitTime
* @param timeUnit
* @return
* @throws InterruptedException
*/
public boolean lock_with_waitTime(Jedis jedis,String key, String value, int timeout, long waitTime,TimeUnit timeUnit) throws InterruptedException
long seconds = timeUnit.toSeconds(timeout);
while (waitTime >= 0)
String result = jedis.set(key, value, "nx", "ex", seconds);
if ("OK".equals(result))
return true;
waitTime -= sleepTime;
Thread.sleep(sleepTime);
return false;
/**
* 错误的解锁方法—直接删除key
*
* @param key
*/
public void unlock_with_del(Jedis jedis,String key)
jedis.del(key);
/**
* 使用Lua脚本进行解锁操纵,解锁的时候验证value值
*
* @param jedis
* @param key
* @param value
* @return
*/
public boolean unlock(Jedis jedis,String key,String value)
String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1]) else return 0 end";
return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L);
4. Controller层控制
定义一个TestController来测试我们实现的分布式锁
@RestController
public class TestController
@RedisLock(key = "redis_lock")
@GetMapping("/index")
public String index()
return "index";
@RestController
public class TestController
@RedisLock(key = "redis_lock")
@GetMapping("/index")
public String index()
return "index";
redission分布式锁防止重复初始化问题
配置地址:
redisson:
# Redis服务地址 如果集群使用","进行分割
server-address: redis://$spring.redis.host:$spring.redis.port
database: $spring.redis.database
创建配置类:
@ConfigurationProperties(prefix = "redisson")
@Configuration
public class RedissonConfig
/** Redis服务地址 如果集群使用","进行分割 */
private String serverAddress;
private Integer database;
public String getServerAddress()
return serverAddress;
public void setServerAddress(String serverAddresss)
this.serverAddress = serverAddresss;
public Integer getDatabase()
return database;
public void setDatabase(Integer database)
this.database = database;
注册redission Bean:
/**
* RedissonClient
* @return
*/
@Bean
public RedissonClient redissonClient()
String splitChar = ",";
String serverAddress = redissonConfig.getServerAddress();
String[] serverAddressArr = serverAddress.split(splitChar);
Config config = new Config();
if (serverAddressArr.length == 1)
//单例redis
config.useSingleServer()
.setAddress(redissonConfig.getServerAddress())
.setDatabase(redissonConfig.getDatabase());
else
//集群redis
config.useClusterServers().addNodeAddress(serverAddressArr);
return Redisson.create(config);
防止重复初始化:
/**
* 初始化告警统计记录 保证告警统计不会被重复初始化
*
* @param areaId 部门id
* @throws BizException
*/
public void initWarningStatisticsSafe(String areaId) throws BizException
//创建锁名称
String lockName = generateInitWarningStatisticsLockName(areaId);
RLock lock = redisClient.getLock(lockName);
try
//尝试获取锁 (最多尝试10秒,获取到后15秒后释放锁)
boolean isAcquireLock = lock.tryLock(10, 15, TimeUnit.SECONDS);
if (!isAcquireLock)
logger.error("初始化消息统计 --- 获取锁失败 lockName = " + lockName);
throw new BizException("系统异常");
else
try
//查询【告警统计】 (能够查询到最新的数据,因为默认隔离级别是read committed
List<WarningStatisticsPO> warningStatisticsPOS = warningStatisticsDAO.selectByArea(areaId);
if (CollectionUtils.isEmpty(warningStatisticsPOS))
//如果部门不存在告警统计,才进行初始化 initWarningStatistics()会开启一个新事务
warningStatisticsService.initWarningStatistics(areaId);
finally
//释放锁 (在事务提交后才释放锁。保证其它事务在获取锁后能查询到数据,不会再进行初始化。)
lock.unlock();
catch (InterruptedException e)
e.printStackTrace();
以上是关于详解redission分布式锁配置及使用,防止java服务重复提交和修改动作等问题的主要内容,如果未能解决你的问题,请参考以下文章
详解redission分布式锁配置及使用,防止java服务重复提交和修改动作等问题
详解redission分布式锁配置及使用,防止java服务重复提交和修改动作等问题
详解redission分布式锁配置及使用,防止java服务重复提交和修改动作等问题