Spring Boot (33) 分布式锁
Posted 海盗屋
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Boot (33) 分布式锁相关的知识,希望对你有一定的参考价值。
上一篇中使用的Guava Cache,如果在集群中就不可以用了,需要借助Redis、Zookeeper之类的中间件实现分布式锁。
导入依赖
在pom.xml中需要添加的依赖包:stater-web、starter-aop、starter-data-redis
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> </dependencies>
属性配置
spring: redis: host: 10.211.55.5 #redis服务器地址 timeout: 10000 #超时时间 database: 0 #0-15 16个库 默认0 lettuce: pool: max-active: 8 #最大连接数 max-wait: -1 #默认-1 最大连接阻塞等待时间 max-idle: 8 #最大空闲连接 默认8 min-idle: 0 #最小空闲连接
CacheLock注解
package com.spring.boot.annotation; import java.lang.annotation.*; import java.util.concurrent.TimeUnit; @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface CacheLock { /** * redis 锁的前缀 * @return */ String prefix() default ""; /** * 过期时间 * @return */ int expire() default 5; /** * 超时时间单位 * @return */ TimeUnit timeUnit() default TimeUnit.SECONDS; /** * 可以的分隔符(默认:) * @return */ String delimiter() default ":"; }
CacheParam注解
package com.spring.boot.annotation; import java.lang.annotation.*; @Target({ElementType.PARAMETER, ElementType.METHOD, ElementType.FIELD}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface CacheParam { /** * 字段名称 * * @return String */ String name() default ""; }
Key生成策略(接口)
package com.spring.boot.annotation; import org.aspectj.lang.ProceedingJoinPoint; public interface CacheKeyGenerator { String getLockKey(ProceedingJoinPoint pjp); }
Key生成策略(实现)
主要是解析带CacheLock注解的属性,获取对应的属性值,生成一个全新的缓存Key
package com.spring.boot.annotation; import io.lettuce.core.dynamic.support.ReflectionUtils; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.reflect.MethodSignature; import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.lang.reflect.Parameter; public class LockKeyGenerator implements CacheKeyGenerator { @Override public String getLockKey(ProceedingJoinPoint pjp) { MethodSignature signature = (MethodSignature) pjp.getSignature(); Method method = signature.getMethod(); CacheLock lockAnnotation = method.getAnnotation(CacheLock.class); final Object[] args = pjp.getArgs(); final Parameter[] parameters = method.getParameters(); StringBuilder builder = new StringBuilder(); // TODO 默认解析方法里面带 CacheParam 注解的属性,如果没有尝试着解析实体对象中的 for (int i = 0; i < parameters.length; i++) { final CacheParam annotation = parameters[i].getAnnotation(CacheParam.class); if (annotation == null) { continue; } builder.append(lockAnnotation.delimiter()).append(args[i]); } if (builder == null || builder.toString() == "") { final Annotation[][] parameterAnnotations = method.getParameterAnnotations(); for (int i = 0; i < parameterAnnotations.length; i++) { final Object object = args[i]; final Field[] fields = object.getClass().getDeclaredFields(); for (Field field : fields) { final CacheParam annotation = field.getAnnotation(CacheParam.class); if (annotation == null) { continue; } field.setAccessible(true); builder.append(lockAnnotation.delimiter()).append(ReflectionUtils.getField(field, object)); } } } return lockAnnotation.prefix() + builder.toString(); } }
Lock拦截器(AOP)
opsForValue().setIfAbsent(key,value)如果缓存中没有当前key则进行缓存,同时返回true,否则 返回false。当缓存后给key在设置个过期时间,防止因为系统崩溃而导致锁迟迟不释放形成死锁。 我们就可以这样人物当返回true它获取到锁了,在所未释放的时候我们进行异常的抛出
package com.spring.boot.annotation; import org.aspectj.lang.annotation.Aspect; import org.springframework.context.annotation.Configuration; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.RedisStringCommands; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.types.Expiration; import org.springframework.util.StringUtils; import java.lang.reflect.Method; @Aspect @Configuration public class LockMethodInterceptor { private final StringRedisTemplate lockRedisTemplate; private final CacheKeyGenerator cacheKeyGenerator; @Autowired public LockMethodInterceptor(StringRedisTemplate lockRedisTemplate, CacheKeyGenerator cacheKeyGenerator) { this.lockRedisTemplate = lockRedisTemplate; this.cacheKeyGenerator = cacheKeyGenerator; } @Around("execution(public * *(..)) && @annotation(com.spring.boot.annotation.CacheLock)") public Object interceptor(ProceedingJoinPoint pjp) { MethodSignature signature = (MethodSignature) pjp.getSignature(); Method method = signature.getMethod(); CacheLock lock = method.getAnnotation(CacheLock.class); if (StringUtils.isEmpty(lock.prefix())) { throw new RuntimeException("lock key don\'t null..."); } final String lockKey = cacheKeyGenerator.getLockKey(pjp); try { // 采用原生 API 来实现分布式锁 final Boolean success = lockRedisTemplate.execute((RedisCallback<Boolean>) connection -> connection.set(lockKey.getBytes(), new byte[0], Expiration.from(lock.expire(), lock.timeUnit()), RedisStringCommands.SetOption.SET_IF_ABSENT)); if (!success) { // TODO 按理来说 我们应该抛出一个自定义的 CacheLockException 异常; throw new RuntimeException("请勿重复请求"); } try { return pjp.proceed(); } catch (Throwable throwable) { throw new RuntimeException("系统异常"); } } finally { // TODO 如果演示的话需要注释该代码;实际应该放开 // lockRedisTemplate.delete(lockKey); } } }
控制器
package com.spring.boot.controller; import com.spring.boot.annotation.CacheLock; import com.spring.boot.annotation.CacheParam; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/books") public class BookController { @CacheLock(prefix = "books") @GetMapping public String query(@CacheParam(name = "token") @RequestParam String token){ return "success - " + token; } }
还要在启动类中注入CacheKeyGenerator接口具体实现
@SpringBootApplication public class BootApplication{ public static void main(String[] args) { SpringApplication.run(BootApplication.class,args); } @Bean public CacheKeyGenerator cacheKeyGenerator() { return new LockKeyGenerator(); }
测试:
http://localhost:8088/books?token=1
5秒内再次请求
以上是关于Spring Boot (33) 分布式锁的主要内容,如果未能解决你的问题,请参考以下文章