不用找了,基于 Redis 的分布式锁实战来了!
Posted Java技术栈
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了不用找了,基于 Redis 的分布式锁实战来了!相关的知识,希望对你有一定的参考价值。
Java技术栈
www.javastack.cn
my.oschina.net/wnjustdoit/blog/1606215
那么,本文主要来讨论基于redis的分布式锁算法问题。
-
EX seconds – 设置键key的过期时间,单位时秒 -
PX milliseconds – 设置键key的过期时间,单位时毫秒 -
NX – 只有键key不存在的时候才会设置key的值 -
XX – 只有键key存在的时候才会设置key的值
中文地址:http://redis.cn/commands/set.html
注意: 由于SET命令加上选项已经可以完全取代SETNX, SETEX, PSETEX的功能,所以在将来的版本中,redis可能会不推荐使用并且最终抛弃这几个命令。
1、死锁问题;
2、锁释放问题,这里会有两个问题;
3、更可靠的锁;
关于Martin Kleppmann的Redlock的分析
中文地址:http://redis.cn/topics/distlock.html
-
安全属性(Safety property): 独享(相互排斥)。在任意一个时刻,只有一个客户端持有锁。 -
活性A(Liveness property A): 无死锁。即便持有锁的客户端崩溃(crashed)或者网络被分裂(gets partitioned),锁仍然可以被获取。 -
活性B(Liveness property B): 容错。只要大部分Redis节点都活着,客户端就可以获取和释放锁.
我们来分析一下:
更进一步分析,结合上文提到的关键问题,这里可以引申出另外的两个问题:
-
怎么才能合理判断程序真正处理的有效时间范围?(这里有个时间偏移的问题) -
redis Master节点宕机后恢复(可能还没有持久化到磁盘)、主从节点切换,(N/2)+1这里的N应该怎么动态计算更合理?
最后,来点实践吧
I、传统的单实例redis分布式锁实现(关键步骤)
SET resource_name my_random_value NX PX 30000
手动删除锁(Lua脚本):
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
II、分布式环境的redis(多master节点)的分布式锁实现
为了保证在尽可能短的时间内获取到(N/2)+1个节点的锁,可以并行去获取各个节点的锁(当然,并行可能需要消耗更多的资源,因为串行只需要count到足够数量的锁就可以停止获取了);
另外,怎么动态实时统一获取redis master nodes需要更进一步去思考了。
QA,补充一下说明(以下为我与朋友沟通的情况,以说明文中大家可能不够明白的地方):
1、在关键问题2.1中,删除就删除了,会造成什么问题?
线程A超时,准备删除锁;但此时的锁属于线程B;线程B还没执行完,线程A把锁删除了,这时线程C获取到锁,同时执行程序;所以不能乱删。
2、在关键问题2.2中,只要在key生成时,跟线程相关就不用考虑这个问题了吗?
不同的线程执行程序,线程之间肯虽然有差异呀,然后在redis锁的value设置有线程信息,比如线程id或线程名称,是分布式环境的话加个机器id前缀咯(类似于twitter的snowflake算法!),但是在del命令只会涉及到key,不会再次检查value,所以还是需要lua脚本控制if(condition){xxx}的原子性。
3、那要不要考虑锁的重入性?
不需要重入;try…finally 没得重入的场景;对于单个线程来说,执行是串行的,获取锁之后必定会释放,因为finally的代码必定会执行啊(只要进入了try块,finally必定会执行)。
4、为什么两个线程都会去删除锁?(貌似重复的问题。不管怎样,还是耐心解答吧)
每个线程只能管理自己的锁,不能管理别人线程的锁啊。这里可以联想一下ThreadLocal。
5、如果加锁的线程挂了怎么办?只能等待自动超时?
看你怎么写程序的了,一种是问题3的回答;另外,那就自动超时咯。这种情况也适用于网络over了。
6、时间太长,程序异常就会蛋疼,时间太短,就会出现程序还没有处理完就超时了,这岂不是很尴尬?
是呀,所以需要更好的衡量这个超时时间的设置。
实践部分主要代码:
RedisLock工具类:
package com.caiya.cms.web.component;
import com.caiya.cache.CacheException;
import com.caiya.cache.redis.JedisCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* redis实现分布式锁
* 可实现特性:
* 1、使多线程无序排队获取和释放锁;
* 2、丢弃未成功获得锁的线程处理;
* 3、只释放线程本身加持的锁;
* 4、避免死锁
*
* @author wangnan
* @since 1.0
*/
public final class RedisLock {
private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
/**
* 尝试加锁(仅一次)
*
* @param lockKey 锁key
* @param lockValue 锁value
* @param expireSeconds 锁超时时间(秒)
* @return 是否加锁成功
* @throws CacheException
*/
public static boolean tryLock(String lockKey, String lockValue, long expireSeconds) throws CacheException {
JedisCache jedisCache = JedisCacheFactory.getInstance().getJedisCache();
try {
String response = jedisCache.set(lockKey, lockValue, "nx", "ex", expireSeconds);
return Objects.equals(response, "OK");
} finally {
jedisCache.close();
}
}
/**
* 加锁(指定最大尝试次数范围内)
*
* @param lockKey 锁key
* @param lockValue 锁value
* @param expireSeconds 锁超时时间(秒)
* @param tryTimes 最大尝试次数
* @param sleepMillis 每两次尝试之间休眠时间(毫秒)
* @return 是否加锁成功
* @throws CacheException
*/
public static boolean lock(String lockKey, String lockValue, long expireSeconds, int tryTimes, long sleepMillis) throws CacheException {
boolean result;
int count = 0;
do {
count++;
result = tryLock(lockKey, lockValue, expireSeconds);
try {
TimeUnit.MILLISECONDS.sleep(sleepMillis);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
} while (!result && count <= tryTimes);
return result;
}
/**
* 释放锁
*
* @param lockKey 锁key
* @param lockValue 锁value
*/
public static void unlock(String lockKey, String lockValue) {
JedisCache jedisCache = JedisCacheFactory.getInstance().getJedisCache();
try {
String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
Object result = jedisCache.eval(luaScript, 1, lockKey, lockValue);
// Objects.equals(result, 1L);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
jedisCache.close();
}
// return false;
}
private RedisLock() {
}
}
使用工具类的代码片段1:
...
String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId();// 跟业务相关的唯一拼接键
String lockValue = Constant.DEFAULT_CACHE_NAME + ":" + System.getProperty("JvmId") + ":" + Thread.currentThread().getName() + ":" + System.currentTimeMillis();// 生成集群环境中的唯一值
boolean locked = RedisLock.tryLock(lockKey, lockValue, 100);// 只尝试一次,在本次处理过程中直接拒绝其他线程的请求
if (!locked) {
throw new IllegalAccessException("您的操作太频繁了,休息一下再来吧~");
}
try {
// 开始处理核心业务逻辑
Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId());
...
...
} finally {
RedisLock.unlock(lockKey, lockValue);// 在finally块中释放锁
}
使用工具类的代码片段2:
...
String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId();
String lockValue = Constant.DEFAULT_CACHE_NAME + ":机器编号:" + Thread.currentThread().getName() + ":" + System.currentTimeMillis();
boolean locked = RedisLock.lock(lockKey, lockValue, 100, 20, 100);// 非公平锁,无序竞争(这里需要合理根据业务处理情况设置最大尝试次数和每次休眠时间)
if (!locked) {
throw new IllegalAccessException("系统太忙,本次操作失败");// 一般来说,不会走到这一步;如果真的有这种情况,并且在合理设置锁尝试次数和等待响应时间之后仍然处理不过来,可能需要考虑优化程序响应时间或者用消息队列排队执行了
}
try {
// 开始处理核心业务逻辑
Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId());
...
...
} finally {
RedisLock.unlock(lockKey, lockValue);
}
...
附加:
基于redis的分布式锁实现客户端Redisson:
https://github.com/redisson/redisson/wiki/8.-Distributed-locks-and-synchronizers
基于zookeeper的分布式锁实现:
http://curator.apache.org/curator-recipes/shared-reentrant-lock.html
- END -
最近干货分享
点击「阅读原文」加入栈长的战队~
以上是关于不用找了,基于 Redis 的分布式锁实战来了!的主要内容,如果未能解决你的问题,请参考以下文章
分布式锁三大技术方案实战——基于redis方式实现分布式锁(终极篇)