分布式锁

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式锁相关的知识,希望对你有一定的参考价值。

参考技术A

与分布式锁对应的是【单机锁】,我们在写多线程程序时,避免同时操作一个共享变量而产生数据问题,通常会使用一把锁来实现【互斥】,其使用范围是在【同一个进程中】。(同一个进程内存是共享的,以争抢同一段内存,来判断是否抢到锁)。

如果是多个进程,如何互斥呢。就要引入【分布式锁】来解决这个问题。想要实现分布式锁,必须借助一个外部系统,所有进程都去这个系统上去【申请加锁】。

而这个外部系统,必须要实现【互斥】的能力,即两个请求同时进来,只会给一个进程返回成功,另一个返回失败(或等待)。

这个外部系统,可以是 mysql,也可以是 Redis 或 Zookeeper。但为了追求更好的性能,我们通常会选择使用 Redis 或 Zookeeper 来做。

依赖mysql的行锁 select for update。

一个特例,唯一索引。唯一索引是找到了就直接停止遍历,非唯一索引还会向后遍历一行。移步第八个case。

现在的索引:

分析:

单条命中只会加行锁,不加间隙锁。所以RC/RR是一样的。

事务1对id=10这条记录加行锁。所以 场景1会锁等待,场景2不会锁等待

分析:

RC隔离级别:

事务1未命中,不会加任何锁。所以 场景1,场景2,场景3都不会锁等待

RR隔离级别:

事务1未命中,会加间隙锁。因为主键查询,只会对主键加锁。

在10和18加间隙锁。

间隙锁和查询不冲突。 场景1不会锁等待

间隙锁和插入冲突。 场景2和场景3会锁等待

分析

单条命中只会加行锁,不加间隙锁。所以RC/RR是一样的。

事务1对二级索引和主键索引加行锁。 事务1和事务2都会发生锁等待

分析

RC隔离级别:

事务1未命中,不会加任何锁。所以 场景1,场景2,场景3都不会锁等待

RR隔离级别:

事务1对二级索引N0007到正无穷上间隙锁,主键索引不上锁。 场景1会锁等待,场景2不会锁等待

分析:

RC隔离级别:

只会加行锁。 场景1场景2会锁等待 场景3不会发生锁等待

RR隔离级别:

会加行锁和间隙锁。 场景1场景2场景3都会锁等待

ps: 如果是唯一索引,只会加行锁。非唯一才会加间隙锁。

RC隔离级别:

事务1未命中,不会加任何锁。所以 场景1,场景2都不会锁等待

RR隔离级别:

事务1未命中,会加间隙锁。间隙锁与查询不冲突, 场景1不会发生锁等待 场景2会发生锁等待

分析

RC隔离级别:

事务1加了三个行锁。 场景1会锁等待。场景2,场景3不会发生锁等待

RR隔离级别:

事务1加个三个行锁和间隙锁。 场景1,场景3会发生锁等待 。间隙锁与查询不冲突, 场景2不会锁等待。

分析:

RC隔离级别:

事务1加的都是行锁。 场景1会发生锁等待 场景2,场景3不会发生锁等待

RR隔离级别:

事务1会对二级索引加行锁和间隙锁,对主键索引加行锁。

场景1,场景3会发生锁等待 。间隙锁与查询不冲突, 场景2不会锁等待。

这么看,二级索引和唯一索引没什么区别。

那如果是 select * from book where name < \'Jim\' for update; 呢

如果name是唯一索引。因为找到jim就不会向后遍历了,所以jim和rose之间不会有间隙锁。

分析:

RC隔离级别:

由于没有走索引,所以只能全表扫描。在命中的主键索引上加行锁。 场景1会锁等待,场景2不会锁等待

RR隔离级别:

不开启innodb_locks_unsafe_for_binlog。 会发生锁表

开启innodb_locks_unsafe_for_binlog。和RC隔离级别一样。

RC隔离级别:

未命中不加锁。 场景1,场景2都不会锁等待

RR隔离级别:

未命中, 锁表

在RR隔离级别下,where条件没有索引,都会锁表。

加锁命令:

释放锁命令:

这里存在问题,当释放锁之前异常退出了。这个锁就永远不会被释放了。

怎么解决呢?加一个超时时间。

还有问题,不是原子操作。

redis 2.6.12之后,redis天然支持了

来看一下还有什么问题:

试想这样一个场景

看到了么,这里存在两个严重的问题:

释放别人的锁 :客户端 1 操作共享资源完成后,却又释放了客户端 2 的锁

锁过期 :客户端 1 操作共享资源耗时太久,导致锁被自动释放,之后被客户端 2 持有

解决办法是:客户端在加锁时,设置一个只有自己知道的【唯一标识】进去。

例如,可以是自己的线程id,也可以是一个uuid

在释放锁时,可以这么写:

问题来了,还不是原子的。redis没有原生命令了。这里需要使用lua脚本

锁的过期时间如果评估不好,这个锁就会有「提前」过期的风险,一般的妥协方案是,尽量「冗余」过期时间,降低锁提前过期的概率。

其实可以有比较好的方案:

加锁时,先设置一个过期时间,然后我们开启一个「守护****线程****」,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源还未完成,那么就自动对锁进行「续期」,重新设置过期时间。

这个守护线程我们一般把他叫做【看门狗】线程。

我们在使用 Redis 时,一般会采用 主从集群 + 哨兵 的模式部署,这样做的好处在于,当主库异常宕机时,哨兵可以实现「故障自动切换」,把从库提升为主库,继续提供服务,以此保证可用性。

试想这样的场景:

为此,Redis 的作者提出一种解决方案,就是我们经常听到的 Redlock(红锁)

现在我们来看,Redis 作者提出的 Redlock 方案,是如何解决主从切换后,锁失效问题的。

Redlock 的方案基于 2 个前提:

也就是说,想用使用 Redlock,你至少要部署 5 个 Redis 实例,而且都是主库,它们之间没有任何关系,都是一个个孤立的实例。

Redlock 具体如何使用呢?

整体的流程是这样的,一共分为 5 步:

有 4 个重点:

1) 为什么要在多个实例上加锁?

本质上是为了「容错」,部分实例异常宕机,剩余的实例加锁成功,整个锁服务依旧可用。

2) 为什么大多数加锁成功,才算成功?

多个 Redis 实例一起来用,其实就组成了一个「分布式系统」。

在分布式系统中,总会出现「异常节点」,所以,在谈论分布式系统问题时,需要考虑异常节点达到多少个,也依旧不会影响整个系统的「正确性」。

这是一个分布式系统「容错」问题,这个问题的结论是: 如果只存在「故障」节点,只要大多数节点正常,那么整个系统依旧是可以提供正确服务的。

3) 为什么步骤 3 加锁成功后,还要计算加锁的累计耗时?

因为操作的是多个节点,所以耗时肯定会比操作单个实例耗时更久,而且,因为是网络请求,网络情况是复杂的,有可能存在 延迟、丢包、超时 等情况发生,网络请求越多,异常发生的概率就越大。

所以,即使大多数节点加锁成功,但如果加锁的累计耗时已经「超过」了锁的过期时间,那此时有些实例上的锁可能已经失效了,这个锁就没有意义了。

4) 为什么释放锁,要操作所有节点?

在某一个 Redis 节点加锁时,可能因为「网络原因」导致加锁失败。

例如,客户端在一个 Redis 实例上加锁成功,但在读取响应结果时,网络问题导致 读取失败 ,那这把锁其实已经在 Redis 上加锁成功了。

所以,释放锁时,不管之前有没有加锁成功,需要释放「所有节点」的锁,以保证清理节点上「残留」的锁。

好了,明白了 Redlock 的流程和相关问题,看似 Redlock 确实解决了 Redis 节点异常宕机锁失效的问题,保证了锁的「安全性」。

在martin的文章中,主要阐述了4个论点:

第一:效率

使用分布式锁的互斥能力,是避免不必要地做同样的工作两次。如果锁失效,并不会带来「恶性」的后果,例如发了 2 次邮件等,无伤大雅。

第二:正确性

使用锁用来防止并发进程相互干扰。如果锁失败,会造成多个进程同时操作同一条数据,产生的后果是数据严重错误、永久性不一致、数据丢失等恶性问题,后果严重。

他认为,如果你是为了前者——效率,那么使用单机版redis就可以了,即使偶尔发生锁失效(宕机、主从切换),都不会产生严重的后果。而使用redlock太重了,没必要。

而如果是为了正确性,他认为redlock根本达不到安全性的要求,也依旧存在锁失效的问题!

一个分布式系统,存在着你想不到的各种异常。这些异常场景主要包括三大块,这也是分布式系统会遇到的三座大山: NPC

martin用一个进程暂停(GC)的例子,指出了redlock安全性的问题:

又或者,当多个Redis节点时钟发生了问题时,也会导致redlock锁失效。

在混乱的分布式系统中,你不能假设系统时钟就是对的。

个人理解,相当于在业务层再做一层乐观锁。

一个好的分布式锁,无论 NPC 怎么发生,可以不在规定时间内给出结果,但并不会给出一个错误的结果。也就是只会影响到锁的「性能」(或称之为活性),而不会影响它的「正确性」。

1、Redlock 不伦不类 :它对于效率来讲,Redlock 比较重,没必要这么做,而对于正确性来说,Redlock 是不够安全的。

2、时钟假设不合理 :该算法对系统时钟做出了危险的假设(假设多个节点机器时钟都是一致的),如果不满足这些假设,锁就会失效。

3、无法保证正确性 :Redlock 不能提供类似 fencing token 的方案,所以解决不了正确性的问题。为了正确性,请使用有「共识系统」的软件,例如 Zookeeper。

好了,以上就是 Martin 反对使用 Redlock 的观点,看起来有理有据。

下面我们来看 Redis 作者 Antirez 是如何反驳的。

首先,Redis 作者一眼就看穿了对方提出的最为核心的问题: 时钟问题

Redis 作者表示,Redlock 并不需要完全一致的时钟,只需要大体一致就可以了,允许有「误差」。

例如要计时 5s,但实际可能记了 4.5s,之后又记了 5.5s,有一定误差,但只要不超过「误差范围」锁失效时间即可,这种对于时钟的精度的要求并不是很高,而且这也符合现实环境。

对于对方提到的「时钟修改」问题,Redis 作者反驳到:

Redis 作者继续论述,如果对方认为,发生网络延迟、进程 GC 是在客户端确认拿到了锁,去操作共享资源的途中发生了问题,导致锁失效,那这 不止是 Redlock 的问题,任何其它锁服务例如 Zookeeper,都有类似的问题,这不在讨论范畴内

这里我举个例子解释一下这个问题:

Redis 作者这里的结论就是:

所以,Redis 作者认为 Redlock 在保证时钟正确的基础上,是可以保证正确性的。

这个方案必须要求要操作的「共享资源服务器」有拒绝「旧 token」的能力。

例如,要操作 MySQL,从锁服务拿到一个递增数字的 token,然后客户端要带着这个 token 去改 MySQL 的某一行,这就需要利用 MySQL 的「事物隔离性」来做。

但如果操作的不是 MySQL 呢?例如向磁盘上写一个文件,或发起一个 HTTP 请求,那这个方案就无能为力了,这对要操作的资源服务器,提出了更高的要求。

也就是说,大部分要操作的资源服务器,都是没有这种互斥能力的。

再者,既然资源服务器都有了「互斥」能力,那还要分布式锁干什么?

利用 zookeeper 的同级节点的唯一性特性,在需要获取排他锁时,所有的客户端试图通过调用 create() 接口,在 /exclusive_lock 节点下创建临时子节点 /exclusive_lock/lock ,最终只有一个客户端能创建成功,那么此客户端就获得了分布式锁。同时,所有没有获取到锁的客户端可以在 /exclusive_lock 节点上注册一个子节点变更的 watcher 监听事件,以便重新争取获得锁。

锁释放依赖心跳。集群中占用锁的客户端失联时,锁能够被有效释放。一旦占用Znode锁的客户端与ZooKeeper集群服务器失去联系,这个临时Znode也将自动删除

zookeeper的高可用依赖zab。简单的说就是写入时,半数follower ack,写入成功。

zk是100%安全的么:

分析一个例子:

所以,得出一个结论: 一个分布式锁,在极端情况下,不一定是安全的。

redlock运维成本也比较高。单机有高可用问题。所以还是主从+哨兵这样的部署方式会好一些。

redis的缺点是:不是100%可靠。

mysql的缺点是:扛不住高流量请求。

可以二者结合,先用redis做分布式锁,扛住大部分流量缓解mysql压力。最后一定要用mysql做兜底保证100%的正确性。

SpringBoot--防止重复提交(锁机制---本地锁分布式锁)

  防止重复提交,主要是使用锁的形式来处理,如果是单机部署,可以使用本地缓存锁(Guava)即可,如果是分布式部署,则需要使用分布式锁(可以使用zk分布式锁或者redis分布式锁),本文的分布式锁以redis分布式锁为例。

  一、本地锁(Guava)

  1、导入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>21.0</version>
        </dependency>

  2、自定义本地锁注解

package com.example.demo.utils;

import java.lang.annotation.*;

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface LocalLock {
    String key() default "";
    //过期时间,使用本地缓存可以忽略,如果使用redis做缓存就需要
    int expire() default 5;
}

  3、本地锁注解实现

package com.example.demo.utils;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;

import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;

@Aspect
@Configuration
public class LockMethodInterceptor {
    //定义缓存,设置最大缓存数及过期日期
    private static final Cache<String,Object> CACHE = CacheBuilder.newBuilder().maximumSize(1000).expireAfterWrite(20, TimeUnit.SECONDS).build();

    @Around("execution(public * *(..))  && @annotation(com.example.demo.utils.LocalLock)")
    public Object interceptor(ProceedingJoinPoint joinPoint){
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        LocalLock localLock = method.getAnnotation(LocalLock.class);
        String key = getKey(localLock.key(),joinPoint.getArgs());
        if(!StringUtils.isEmpty(key)){
            if(CACHE.getIfPresent(key) != null){
                throw new RuntimeException("请勿重复请求!");
            }
            CACHE.put(key,key);
        }
        try{
            return joinPoint.proceed();
        }catch (Throwable throwable){
            throw new RuntimeException("服务器异常");
        }finally {

        }
    }

    private String getKey(String keyExpress, Object[] args){
        for (int i = 0; i < args.length; i++) {
            keyExpress = keyExpress.replace("arg[" + i + "]", args[i].toString());
        }
        return keyExpress;
    }

}

  4、控制层

    @ResponseBody
    @PostMapping(value ="/localLock")
    @ApiOperation(value="重复提交验证测试--使用本地缓存锁")
    @ApiImplicitParams( {@ApiImplicitParam(paramType="query", name = "token", value = "token", dataType = "String")})
    @LocalLock(key = "localLock:test:arg[0]")
    public String localLock(String token){

        return "sucess====="+token;
    }

  5、测试

  第一次请求:

  

 

   未过期再次访问:

  

 

 

二、Redis分布式锁

  1、导入依赖

  导入aop依赖和redis依赖即可

  2、配置

  配置redis连接信息即可

  3、自定义分布式锁注解

package com.example.demo.utils;

import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface CacheLock {
    //redis锁前缀
    String prefix() default "";
    //redis锁过期时间
    int expire() default 5;
    //redis锁过期时间单位
    TimeUnit timeUnit() default TimeUnit.SECONDS;
    //redis  key分隔符
    String delimiter() default ":";
}

  4、自定义key规则注解

  由于redis的key可能是多层级结构,例如 redistest:demo1:token:kkk这种形式,因此需要自定义key的规则。

package com.example.demo.utils;

import java.lang.annotation.*;

@Target({ElementType.METHOD,ElementType.PARAMETER,ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface CacheParam {
    String name() default "";
}

  5、定义key生成策略接口

package com.example.demo.service;

import org.aspectj.lang.ProceedingJoinPoint;
import org.springframework.stereotype.Service;

public interface CacheKeyGenerator {
    //获取AOP参数,生成指定缓存Key
    String getLockKey(ProceedingJoinPoint joinPoint);
}

  6、定义key生成策略实现类

package com.example.demo.service.impl;

import com.example.demo.service.CacheKeyGenerator;
import com.example.demo.utils.CacheLock;
import com.example.demo.utils.CacheParam;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;

public class CacheKeyGeneratorImp implements CacheKeyGenerator {
    @Override
    public String getLockKey(ProceedingJoinPoint joinPoint) {
        //获取连接点的方法签名对象
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
        //Method对象
        Method method = methodSignature.getMethod();
        //获取Method对象上的注解对象
        CacheLock cacheLock = method.getAnnotation(CacheLock.class);
        //获取方法参数
        final Object[] args = joinPoint.getArgs();
        //获取Method对象上所有的注解
        final Parameter[] parameters = method.getParameters();
        StringBuilder sb = new StringBuilder();
        for(int i=0;i<parameters.length;i++){
            final CacheParam cacheParams = parameters[i].getAnnotation(CacheParam.class);
            //如果属性不是CacheParam注解,则不处理
            if(cacheParams == null){
                continue;
            }
            //如果属性是CacheParam注解,则拼接 连接符(:)+ CacheParam
            sb.append(cacheLock.delimiter()).append(args[i]);
        }
        //如果方法上没有加CacheParam注解
        if(StringUtils.isEmpty(sb.toString())){
            //获取方法上的多个注解(为什么是两层数组:因为第二层数组是只有一个元素的数组)
            final Annotation[][] parameterAnnotations = method.getParameterAnnotations();
            //循环注解
            for(int i=0;i<parameterAnnotations.length;i++){
                final Object object = args[i];
                //获取注解类中所有的属性字段
                final Field[] fields = object.getClass().getDeclaredFields();
                for(Field field : fields){
                    //判断字段上是否有CacheParam注解
                    final CacheParam annotation = field.getAnnotation(CacheParam.class);
                    //如果没有,跳过
                    if(annotation ==null){
                        continue;
                    }
                    //如果有,设置Accessible为true(为true时可以使用反射访问私有变量,否则不能访问私有变量)
                    field.setAccessible(true);
                    //如果属性是CacheParam注解,则拼接 连接符(:)+ CacheParam
                    sb.append(cacheLock.delimiter()).append(ReflectionUtils.getField(field,object));
                }
            }
        }
        //返回指定前缀的key
        return cacheLock.prefix() + sb.toString();
    }
}

  7、分布式注解实现

package com.example.demo.utils;

import com.example.demo.service.CacheKeyGenerator;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.util.StringUtils;

import java.lang.reflect.Method;

@Aspect
@Configuration
public class CacheLockMethodInterceptor {



    @Autowired
    public CacheLockMethodInterceptor(StringRedisTemplate stringRedisTemplate, CacheKeyGenerator cacheKeyGenerator){
        this.cacheKeyGenerator = cacheKeyGenerator;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    private final StringRedisTemplate stringRedisTemplate;
    private final CacheKeyGenerator cacheKeyGenerator;

    @Around("execution(public * * (..)) && @annotation(com.example.demo.utils.CacheLock)")
    public Object interceptor(ProceedingJoinPoint joinPoint){
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
        Method method = methodSignature.getMethod();
        CacheLock cacheLock = method.getAnnotation(CacheLock.class);
        if(StringUtils.isEmpty(cacheLock.prefix())){
            throw new RuntimeException("前缀不能为空");
        }
        //获取自定义key
        final String lockkey = cacheKeyGenerator.getLockKey(joinPoint);
        final Boolean success = stringRedisTemplate.execute(
                (RedisCallback<Boolean>) connection -> connection.set(lockkey.getBytes(), new byte[0], Expiration.from(cacheLock.expire(), cacheLock.timeUnit())
                        , RedisStringCommands.SetOption.SET_IF_ABSENT));
        if (!success) {
            // TODO 按理来说 我们应该抛出一个自定义的 CacheLockException 异常;这里偷下懒
            throw new RuntimeException("请勿重复请求");
        }
        try {
            return joinPoint.proceed();
        } catch (Throwable throwable) {
            throw new RuntimeException("系统异常");
        }
    }
}

  8、主函数调整

  主函数引入key生成策略

    @Bean
    public CacheKeyGenerator cacheKeyGenerator(){
        return new CacheKeyGeneratorImp();
    }

  9、Controller

    @ResponseBody
    @PostMapping(value ="/cacheLock")
    @ApiOperation(value="重复提交验证测试--使用redis锁")
    @ApiImplicitParams( {@ApiImplicitParam(paramType="query", name = "token", value = "token", dataType = "String")})
    //@CacheLock
    @CacheLock()
    public String cacheLock(String token){
        return "sucess====="+token;
    }

    @ResponseBody
    @PostMapping(value ="/cacheLock1")
    @ApiOperation(value="重复提交验证测试--使用redis锁")
    @ApiImplicitParams( {@ApiImplicitParam(paramType="query", name = "token", value = "token", dataType = "String")})
    //@CacheLock
    @CacheLock(prefix = "redisLock.test",expire = 20)
    public String cacheLock1(String token){
        return "sucess====="+token;
    }

    @ResponseBody
    @PostMapping(value ="/cacheLock2")
    @ApiOperation(value="重复提交验证测试--使用redis锁")
    @ApiImplicitParams( {@ApiImplicitParam(paramType="query", name = "token", value = "token", dataType = "String")})
    //@CacheLock
    @CacheLock(prefix = "redisLock.test",expire = 20)
    public String cacheLock2(@CacheParam(name = "token") String token){
        return "sucess====="+token;
    }

  10、测试

  (1)由于cacheLock方法的CacheLock注解没有加prefix前缀,因此会报错

 

 

  (2)没有加CacheParam注解

  第一次调用:

  缓存信息:

  可以发现key为prifix的值

 

   第二次调用:

 

 

   (3)增加了CacheParam注解

  第一次调用:

  

 

   缓存信息:

  可以发现缓存的内容为prefix+@CacheParam

  

 

   第二次调用:

 

以上是关于分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

Redis实现分布式锁与Zookeeper实现分布式锁区别

分布式锁,及Redis实现分布式锁

分布式锁

高并发架构系列:什么是分布式锁?Redis实现分布式锁详解

Redisson分布式锁设计方案

Redis | 黑马点评 + 思维导图分布式锁