分布式锁
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式锁相关的知识,希望对你有一定的参考价值。
参考技术A与分布式锁对应的是【单机锁】,我们在写多线程程序时,避免同时操作一个共享变量而产生数据问题,通常会使用一把锁来实现【互斥】,其使用范围是在【同一个进程中】。(同一个进程内存是共享的,以争抢同一段内存,来判断是否抢到锁)。
如果是多个进程,如何互斥呢。就要引入【分布式锁】来解决这个问题。想要实现分布式锁,必须借助一个外部系统,所有进程都去这个系统上去【申请加锁】。
而这个外部系统,必须要实现【互斥】的能力,即两个请求同时进来,只会给一个进程返回成功,另一个返回失败(或等待)。
这个外部系统,可以是 mysql,也可以是 Redis 或 Zookeeper。但为了追求更好的性能,我们通常会选择使用 Redis 或 Zookeeper 来做。
依赖mysql的行锁 select for update。
一个特例,唯一索引。唯一索引是找到了就直接停止遍历,非唯一索引还会向后遍历一行。移步第八个case。
现在的索引:
分析:
单条命中只会加行锁,不加间隙锁。所以RC/RR是一样的。
事务1对id=10这条记录加行锁。所以 场景1会锁等待,场景2不会锁等待 。
分析:
RC隔离级别:
事务1未命中,不会加任何锁。所以 场景1,场景2,场景3都不会锁等待 。
RR隔离级别:
事务1未命中,会加间隙锁。因为主键查询,只会对主键加锁。
在10和18加间隙锁。
间隙锁和查询不冲突。 场景1不会锁等待 。
间隙锁和插入冲突。 场景2和场景3会锁等待 。
分析
单条命中只会加行锁,不加间隙锁。所以RC/RR是一样的。
事务1对二级索引和主键索引加行锁。 事务1和事务2都会发生锁等待 。
分析
RC隔离级别:
事务1未命中,不会加任何锁。所以 场景1,场景2,场景3都不会锁等待 。
RR隔离级别:
事务1对二级索引N0007到正无穷上间隙锁,主键索引不上锁。 场景1会锁等待,场景2不会锁等待 。
分析:
RC隔离级别:
只会加行锁。 场景1场景2会锁等待 。 场景3不会发生锁等待 。
RR隔离级别:
会加行锁和间隙锁。 场景1场景2场景3都会锁等待 。
ps: 如果是唯一索引,只会加行锁。非唯一才会加间隙锁。
RC隔离级别:
事务1未命中,不会加任何锁。所以 场景1,场景2都不会锁等待 。
RR隔离级别:
事务1未命中,会加间隙锁。间隙锁与查询不冲突, 场景1不会发生锁等待 。 场景2会发生锁等待 。
分析
RC隔离级别:
事务1加了三个行锁。 场景1会锁等待。场景2,场景3不会发生锁等待 。
RR隔离级别:
事务1加个三个行锁和间隙锁。 场景1,场景3会发生锁等待 。间隙锁与查询不冲突, 场景2不会锁等待。
分析:
RC隔离级别:
事务1加的都是行锁。 场景1会发生锁等待 , 场景2,场景3不会发生锁等待 。
RR隔离级别:
事务1会对二级索引加行锁和间隙锁,对主键索引加行锁。
场景1,场景3会发生锁等待 。间隙锁与查询不冲突, 场景2不会锁等待。
这么看,二级索引和唯一索引没什么区别。
那如果是 select * from book where name < \'Jim\' for update; 呢
如果name是唯一索引。因为找到jim就不会向后遍历了,所以jim和rose之间不会有间隙锁。
分析:
RC隔离级别:
由于没有走索引,所以只能全表扫描。在命中的主键索引上加行锁。 场景1会锁等待,场景2不会锁等待 。
RR隔离级别:
不开启innodb_locks_unsafe_for_binlog。 会发生锁表 。
开启innodb_locks_unsafe_for_binlog。和RC隔离级别一样。
RC隔离级别:
未命中不加锁。 场景1,场景2都不会锁等待 。
RR隔离级别:
未命中, 锁表 。
在RR隔离级别下,where条件没有索引,都会锁表。
加锁命令:
释放锁命令:
这里存在问题,当释放锁之前异常退出了。这个锁就永远不会被释放了。
怎么解决呢?加一个超时时间。
还有问题,不是原子操作。
redis 2.6.12之后,redis天然支持了
来看一下还有什么问题:
试想这样一个场景
看到了么,这里存在两个严重的问题:
释放别人的锁 :客户端 1 操作共享资源完成后,却又释放了客户端 2 的锁
锁过期 :客户端 1 操作共享资源耗时太久,导致锁被自动释放,之后被客户端 2 持有
解决办法是:客户端在加锁时,设置一个只有自己知道的【唯一标识】进去。
例如,可以是自己的线程id,也可以是一个uuid
在释放锁时,可以这么写:
问题来了,还不是原子的。redis没有原生命令了。这里需要使用lua脚本
锁的过期时间如果评估不好,这个锁就会有「提前」过期的风险,一般的妥协方案是,尽量「冗余」过期时间,降低锁提前过期的概率。
其实可以有比较好的方案:
加锁时,先设置一个过期时间,然后我们开启一个「守护****线程****」,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源还未完成,那么就自动对锁进行「续期」,重新设置过期时间。
这个守护线程我们一般把他叫做【看门狗】线程。
我们在使用 Redis 时,一般会采用 主从集群 + 哨兵 的模式部署,这样做的好处在于,当主库异常宕机时,哨兵可以实现「故障自动切换」,把从库提升为主库,继续提供服务,以此保证可用性。
试想这样的场景:
为此,Redis 的作者提出一种解决方案,就是我们经常听到的 Redlock(红锁) 。
现在我们来看,Redis 作者提出的 Redlock 方案,是如何解决主从切换后,锁失效问题的。
Redlock 的方案基于 2 个前提:
也就是说,想用使用 Redlock,你至少要部署 5 个 Redis 实例,而且都是主库,它们之间没有任何关系,都是一个个孤立的实例。
Redlock 具体如何使用呢?
整体的流程是这样的,一共分为 5 步:
有 4 个重点:
1) 为什么要在多个实例上加锁?
本质上是为了「容错」,部分实例异常宕机,剩余的实例加锁成功,整个锁服务依旧可用。
2) 为什么大多数加锁成功,才算成功?
多个 Redis 实例一起来用,其实就组成了一个「分布式系统」。
在分布式系统中,总会出现「异常节点」,所以,在谈论分布式系统问题时,需要考虑异常节点达到多少个,也依旧不会影响整个系统的「正确性」。
这是一个分布式系统「容错」问题,这个问题的结论是: 如果只存在「故障」节点,只要大多数节点正常,那么整个系统依旧是可以提供正确服务的。
3) 为什么步骤 3 加锁成功后,还要计算加锁的累计耗时?
因为操作的是多个节点,所以耗时肯定会比操作单个实例耗时更久,而且,因为是网络请求,网络情况是复杂的,有可能存在 延迟、丢包、超时 等情况发生,网络请求越多,异常发生的概率就越大。
所以,即使大多数节点加锁成功,但如果加锁的累计耗时已经「超过」了锁的过期时间,那此时有些实例上的锁可能已经失效了,这个锁就没有意义了。
4) 为什么释放锁,要操作所有节点?
在某一个 Redis 节点加锁时,可能因为「网络原因」导致加锁失败。
例如,客户端在一个 Redis 实例上加锁成功,但在读取响应结果时,网络问题导致 读取失败 ,那这把锁其实已经在 Redis 上加锁成功了。
所以,释放锁时,不管之前有没有加锁成功,需要释放「所有节点」的锁,以保证清理节点上「残留」的锁。
好了,明白了 Redlock 的流程和相关问题,看似 Redlock 确实解决了 Redis 节点异常宕机锁失效的问题,保证了锁的「安全性」。
在martin的文章中,主要阐述了4个论点:
第一:效率 。
使用分布式锁的互斥能力,是避免不必要地做同样的工作两次。如果锁失效,并不会带来「恶性」的后果,例如发了 2 次邮件等,无伤大雅。
第二:正确性 。
使用锁用来防止并发进程相互干扰。如果锁失败,会造成多个进程同时操作同一条数据,产生的后果是数据严重错误、永久性不一致、数据丢失等恶性问题,后果严重。
他认为,如果你是为了前者——效率,那么使用单机版redis就可以了,即使偶尔发生锁失效(宕机、主从切换),都不会产生严重的后果。而使用redlock太重了,没必要。
而如果是为了正确性,他认为redlock根本达不到安全性的要求,也依旧存在锁失效的问题!
一个分布式系统,存在着你想不到的各种异常。这些异常场景主要包括三大块,这也是分布式系统会遇到的三座大山: NPC
martin用一个进程暂停(GC)的例子,指出了redlock安全性的问题:
又或者,当多个Redis节点时钟发生了问题时,也会导致redlock锁失效。
在混乱的分布式系统中,你不能假设系统时钟就是对的。
个人理解,相当于在业务层再做一层乐观锁。
一个好的分布式锁,无论 NPC 怎么发生,可以不在规定时间内给出结果,但并不会给出一个错误的结果。也就是只会影响到锁的「性能」(或称之为活性),而不会影响它的「正确性」。
1、Redlock 不伦不类 :它对于效率来讲,Redlock 比较重,没必要这么做,而对于正确性来说,Redlock 是不够安全的。
2、时钟假设不合理 :该算法对系统时钟做出了危险的假设(假设多个节点机器时钟都是一致的),如果不满足这些假设,锁就会失效。
3、无法保证正确性 :Redlock 不能提供类似 fencing token 的方案,所以解决不了正确性的问题。为了正确性,请使用有「共识系统」的软件,例如 Zookeeper。
好了,以上就是 Martin 反对使用 Redlock 的观点,看起来有理有据。
下面我们来看 Redis 作者 Antirez 是如何反驳的。
首先,Redis 作者一眼就看穿了对方提出的最为核心的问题: 时钟问题 。
Redis 作者表示,Redlock 并不需要完全一致的时钟,只需要大体一致就可以了,允许有「误差」。
例如要计时 5s,但实际可能记了 4.5s,之后又记了 5.5s,有一定误差,但只要不超过「误差范围」锁失效时间即可,这种对于时钟的精度的要求并不是很高,而且这也符合现实环境。
对于对方提到的「时钟修改」问题,Redis 作者反驳到:
Redis 作者继续论述,如果对方认为,发生网络延迟、进程 GC 是在客户端确认拿到了锁,去操作共享资源的途中发生了问题,导致锁失效,那这 不止是 Redlock 的问题,任何其它锁服务例如 Zookeeper,都有类似的问题,这不在讨论范畴内 。
这里我举个例子解释一下这个问题:
Redis 作者这里的结论就是:
所以,Redis 作者认为 Redlock 在保证时钟正确的基础上,是可以保证正确性的。
这个方案必须要求要操作的「共享资源服务器」有拒绝「旧 token」的能力。
例如,要操作 MySQL,从锁服务拿到一个递增数字的 token,然后客户端要带着这个 token 去改 MySQL 的某一行,这就需要利用 MySQL 的「事物隔离性」来做。
但如果操作的不是 MySQL 呢?例如向磁盘上写一个文件,或发起一个 HTTP 请求,那这个方案就无能为力了,这对要操作的资源服务器,提出了更高的要求。
也就是说,大部分要操作的资源服务器,都是没有这种互斥能力的。
再者,既然资源服务器都有了「互斥」能力,那还要分布式锁干什么?
利用 zookeeper 的同级节点的唯一性特性,在需要获取排他锁时,所有的客户端试图通过调用 create() 接口,在 /exclusive_lock 节点下创建临时子节点 /exclusive_lock/lock ,最终只有一个客户端能创建成功,那么此客户端就获得了分布式锁。同时,所有没有获取到锁的客户端可以在 /exclusive_lock 节点上注册一个子节点变更的 watcher 监听事件,以便重新争取获得锁。
锁释放依赖心跳。集群中占用锁的客户端失联时,锁能够被有效释放。一旦占用Znode锁的客户端与ZooKeeper集群服务器失去联系,这个临时Znode也将自动删除
zookeeper的高可用依赖zab。简单的说就是写入时,半数follower ack,写入成功。
zk是100%安全的么:
分析一个例子:
所以,得出一个结论: 一个分布式锁,在极端情况下,不一定是安全的。
redlock运维成本也比较高。单机有高可用问题。所以还是主从+哨兵这样的部署方式会好一些。
redis的缺点是:不是100%可靠。
mysql的缺点是:扛不住高流量请求。
可以二者结合,先用redis做分布式锁,扛住大部分流量缓解mysql压力。最后一定要用mysql做兜底保证100%的正确性。
SpringBoot--防止重复提交(锁机制---本地锁分布式锁)
防止重复提交,主要是使用锁的形式来处理,如果是单机部署,可以使用本地缓存锁(Guava)即可,如果是分布式部署,则需要使用分布式锁(可以使用zk分布式锁或者redis分布式锁),本文的分布式锁以redis分布式锁为例。
一、本地锁(Guava)
1、导入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>21.0</version> </dependency>
2、自定义本地锁注解
package com.example.demo.utils; import java.lang.annotation.*; @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface LocalLock { String key() default ""; //过期时间,使用本地缓存可以忽略,如果使用redis做缓存就需要 int expire() default 5; }
3、本地锁注解实现
package com.example.demo.utils; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.Signature; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; import java.lang.reflect.Method; import java.util.concurrent.TimeUnit; @Aspect @Configuration public class LockMethodInterceptor { //定义缓存,设置最大缓存数及过期日期 private static final Cache<String,Object> CACHE = CacheBuilder.newBuilder().maximumSize(1000).expireAfterWrite(20, TimeUnit.SECONDS).build(); @Around("execution(public * *(..)) && @annotation(com.example.demo.utils.LocalLock)") public Object interceptor(ProceedingJoinPoint joinPoint){ MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); LocalLock localLock = method.getAnnotation(LocalLock.class); String key = getKey(localLock.key(),joinPoint.getArgs()); if(!StringUtils.isEmpty(key)){ if(CACHE.getIfPresent(key) != null){ throw new RuntimeException("请勿重复请求!"); } CACHE.put(key,key); } try{ return joinPoint.proceed(); }catch (Throwable throwable){ throw new RuntimeException("服务器异常"); }finally { } } private String getKey(String keyExpress, Object[] args){ for (int i = 0; i < args.length; i++) { keyExpress = keyExpress.replace("arg[" + i + "]", args[i].toString()); } return keyExpress; } }
4、控制层
@ResponseBody @PostMapping(value ="/localLock") @ApiOperation(value="重复提交验证测试--使用本地缓存锁") @ApiImplicitParams( {@ApiImplicitParam(paramType="query", name = "token", value = "token", dataType = "String")}) @LocalLock(key = "localLock:test:arg[0]") public String localLock(String token){ return "sucess====="+token; }
5、测试
第一次请求:
未过期再次访问:
二、Redis分布式锁
1、导入依赖
导入aop依赖和redis依赖即可
2、配置
配置redis连接信息即可
3、自定义分布式锁注解
package com.example.demo.utils; import java.lang.annotation.*; import java.util.concurrent.TimeUnit; @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface CacheLock { //redis锁前缀 String prefix() default ""; //redis锁过期时间 int expire() default 5; //redis锁过期时间单位 TimeUnit timeUnit() default TimeUnit.SECONDS; //redis key分隔符 String delimiter() default ":"; }
4、自定义key规则注解
由于redis的key可能是多层级结构,例如 redistest:demo1:token:kkk这种形式,因此需要自定义key的规则。
package com.example.demo.utils; import java.lang.annotation.*; @Target({ElementType.METHOD,ElementType.PARAMETER,ElementType.FIELD}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface CacheParam { String name() default ""; }
5、定义key生成策略接口
package com.example.demo.service; import org.aspectj.lang.ProceedingJoinPoint; import org.springframework.stereotype.Service; public interface CacheKeyGenerator { //获取AOP参数,生成指定缓存Key String getLockKey(ProceedingJoinPoint joinPoint); }
6、定义key生成策略实现类
package com.example.demo.service.impl; import com.example.demo.service.CacheKeyGenerator; import com.example.demo.utils.CacheLock; import com.example.demo.utils.CacheParam; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.lang.reflect.Parameter; public class CacheKeyGeneratorImp implements CacheKeyGenerator { @Override public String getLockKey(ProceedingJoinPoint joinPoint) { //获取连接点的方法签名对象 MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); //Method对象 Method method = methodSignature.getMethod(); //获取Method对象上的注解对象 CacheLock cacheLock = method.getAnnotation(CacheLock.class); //获取方法参数 final Object[] args = joinPoint.getArgs(); //获取Method对象上所有的注解 final Parameter[] parameters = method.getParameters(); StringBuilder sb = new StringBuilder(); for(int i=0;i<parameters.length;i++){ final CacheParam cacheParams = parameters[i].getAnnotation(CacheParam.class); //如果属性不是CacheParam注解,则不处理 if(cacheParams == null){ continue; } //如果属性是CacheParam注解,则拼接 连接符(:)+ CacheParam sb.append(cacheLock.delimiter()).append(args[i]); } //如果方法上没有加CacheParam注解 if(StringUtils.isEmpty(sb.toString())){ //获取方法上的多个注解(为什么是两层数组:因为第二层数组是只有一个元素的数组) final Annotation[][] parameterAnnotations = method.getParameterAnnotations(); //循环注解 for(int i=0;i<parameterAnnotations.length;i++){ final Object object = args[i]; //获取注解类中所有的属性字段 final Field[] fields = object.getClass().getDeclaredFields(); for(Field field : fields){ //判断字段上是否有CacheParam注解 final CacheParam annotation = field.getAnnotation(CacheParam.class); //如果没有,跳过 if(annotation ==null){ continue; } //如果有,设置Accessible为true(为true时可以使用反射访问私有变量,否则不能访问私有变量) field.setAccessible(true); //如果属性是CacheParam注解,则拼接 连接符(:)+ CacheParam sb.append(cacheLock.delimiter()).append(ReflectionUtils.getField(field,object)); } } } //返回指定前缀的key return cacheLock.prefix() + sb.toString(); } }
7、分布式注解实现
package com.example.demo.utils; import com.example.demo.service.CacheKeyGenerator; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisStringCommands; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.types.Expiration; import org.springframework.util.StringUtils; import java.lang.reflect.Method; @Aspect @Configuration public class CacheLockMethodInterceptor { @Autowired public CacheLockMethodInterceptor(StringRedisTemplate stringRedisTemplate, CacheKeyGenerator cacheKeyGenerator){ this.cacheKeyGenerator = cacheKeyGenerator; this.stringRedisTemplate = stringRedisTemplate; } private final StringRedisTemplate stringRedisTemplate; private final CacheKeyGenerator cacheKeyGenerator; @Around("execution(public * * (..)) && @annotation(com.example.demo.utils.CacheLock)") public Object interceptor(ProceedingJoinPoint joinPoint){ MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); Method method = methodSignature.getMethod(); CacheLock cacheLock = method.getAnnotation(CacheLock.class); if(StringUtils.isEmpty(cacheLock.prefix())){ throw new RuntimeException("前缀不能为空"); } //获取自定义key final String lockkey = cacheKeyGenerator.getLockKey(joinPoint); final Boolean success = stringRedisTemplate.execute( (RedisCallback<Boolean>) connection -> connection.set(lockkey.getBytes(), new byte[0], Expiration.from(cacheLock.expire(), cacheLock.timeUnit()) , RedisStringCommands.SetOption.SET_IF_ABSENT)); if (!success) { // TODO 按理来说 我们应该抛出一个自定义的 CacheLockException 异常;这里偷下懒 throw new RuntimeException("请勿重复请求"); } try { return joinPoint.proceed(); } catch (Throwable throwable) { throw new RuntimeException("系统异常"); } } }
8、主函数调整
主函数引入key生成策略
@Bean public CacheKeyGenerator cacheKeyGenerator(){ return new CacheKeyGeneratorImp(); }
9、Controller
@ResponseBody @PostMapping(value ="/cacheLock") @ApiOperation(value="重复提交验证测试--使用redis锁") @ApiImplicitParams( {@ApiImplicitParam(paramType="query", name = "token", value = "token", dataType = "String")}) //@CacheLock @CacheLock() public String cacheLock(String token){ return "sucess====="+token; } @ResponseBody @PostMapping(value ="/cacheLock1") @ApiOperation(value="重复提交验证测试--使用redis锁") @ApiImplicitParams( {@ApiImplicitParam(paramType="query", name = "token", value = "token", dataType = "String")}) //@CacheLock @CacheLock(prefix = "redisLock.test",expire = 20) public String cacheLock1(String token){ return "sucess====="+token; } @ResponseBody @PostMapping(value ="/cacheLock2") @ApiOperation(value="重复提交验证测试--使用redis锁") @ApiImplicitParams( {@ApiImplicitParam(paramType="query", name = "token", value = "token", dataType = "String")}) //@CacheLock @CacheLock(prefix = "redisLock.test",expire = 20) public String cacheLock2(@CacheParam(name = "token") String token){ return "sucess====="+token; }
10、测试
(1)由于cacheLock方法的CacheLock注解没有加prefix前缀,因此会报错
(2)没有加CacheParam注解
第一次调用:
缓存信息:
可以发现key为prifix的值
第二次调用:
(3)增加了CacheParam注解
第一次调用:
缓存信息:
可以发现缓存的内容为prefix+@CacheParam
第二次调用:
以上是关于分布式锁的主要内容,如果未能解决你的问题,请参考以下文章