合并相同请求
Posted zhshlimi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了合并相同请求相关的知识,希望对你有一定的参考价值。
在微服务里经常有并发相同的请求过来,当未命中缓存时,可能多条请求一起穿透缓存到DB,这就导致DB压力的增大
本文使用redis的分布式锁来合并相同的请求
当两个以上相同的请求来请求时, 通过竞争实现将相同的请求线性化.
假设缓存失效,也只有1个线程去访问DB,其他线程在等待和重试 来降低缓存穿透的风险.
代码如下
1.添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> <version>1.5.2.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>1.5.2.RELEASE</version> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjrt</artifactId> <version>1.6.11</version> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version>1.6.11</version> </dependency> <!-- cglib --> <dependency> <groupId>cglib</groupId> <artifactId>cglib</artifactId> <version>2.1</version> </dependency> <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> <version>1.10</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency>
2.添加注解
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * 合并请求的注解 * 仅支持单参数的接口方法 */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface MergeDuplicationRequestAttribute { /** * 分布式锁的key字符串 如 A:b:%s * @return */ String redisLockKeyTemplate() default ""; /** * 分布式锁的key组合项, 如id * 通过 String.format(redisLockKeyTemplate,getValues(object,fields)) 获取真实的分布式锁的key * @return */ String[] redisLockKeyObjectFileds() default {}; /** * 分布式锁的过期时间(毫秒) * @return */ int expireMillseconds() default 1000; /** * 分布式锁的重试间隔(毫秒) * @return */ int retryIntervalMillseconds() default 20; /** * 分布式锁的重试次数 * @return */ int retryTimes() default 3; }
3.添加切面
import com.g2.order.server.annotation.MergeDuplicationRequestAttribute; import com.g2.order.server.config.RedisLock; import com.g2.order.server.utils.ObjectUtils; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.EnableAspectJAutoProxy; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.Arrays; //开启AspectJ 自动代理模式,如果不填proxyTargetClass=true,默认为false, @EnableAspectJAutoProxy(proxyTargetClass = true) @Component @Order(-1) @Aspect public class MergeDuplicationRequestAspect { private static Logger logger = LoggerFactory.getLogger(MergeDuplicationRequestAspect.class); @Autowired private RedisLock redisLock; @Pointcut("@annotation(com.g2.order.server.annotation.MergeDuplicationRequestAttribute)") public void mergeDuplicationRequest() { } @Around("mergeDuplicationRequest()") public Object handleControllerMethod(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { //获取controller对应的方法. MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature(); //获取方法 Method method = methodSignature.getMethod(); MergeDuplicationRequestAttribute annotation = method.getAnnotation(MergeDuplicationRequestAttribute.class); String key = annotation.redisLockKeyTemplate(); String redisKey = key; Object[] args = proceedingJoinPoint.getArgs(); if (args.length > 0) { Object param = args[0]; String[] paramFields = annotation.redisLockKeyObjectFileds(); if (paramFields.length > 0) { String propertiesValue = ObjectUtils.getPropertiesValue(param, Arrays.asList(paramFields)); redisKey = String.format(key, propertiesValue); } } int retryIntervalMillseconds = annotation.retryIntervalMillseconds(); int retryTimes = annotation.retryTimes(); int expireMillseconds = annotation.expireMillseconds(); if (!redisLock.lock(redisKey, expireMillseconds)) { for (int i = 1; i <= retryTimes; i++) { try { logger.info("有相同的请求,第{}次休眠",i); Thread.sleep(retryIntervalMillseconds); } catch (InterruptedException ex) { } if (redisLock.lock(redisKey, expireMillseconds)) { break; } } } return proceedingJoinPoint.proceed(); } }
3.添加分布式锁代码(使用redis集群)
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; import java.util.Arrays; import java.util.Set; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPoolConfig; /** * 自定义Redis配置类 * * * @date 2017/10/19 */ @Slf4j @Configuration public class RedisConfig { /** * jedisCluster */ @Bean @Autowired public JedisCluster jedisCluster(@Qualifier("jedis.pool.config") JedisPoolConfig config, @Value("${redis.host.address}") String hostAndPort, @Value("${redis.password}") String password) { log.info("开始初始化redis..."); /** * 1 先检查redis集群是否已经配置 */ if (StringUtils.isEmpty(hostAndPort)) { throw new RuntimeException("Redis 集群初始化异常。请检查配置redis.host.address配置项"); } /** * 2 根据配置构建hostAndPorts */ Set<HostAndPort> hostAndPorts = Arrays.asList(hostAndPort.split(",")).stream().map(s -> { String[] split = s.split(":"); return new HostAndPort(split[0], Integer.valueOf(split[1])); }).collect(Collectors.toSet()); return new JedisCluster(hostAndPorts, 1000, 1000, 1, password, config); } @Bean(name = "jedis.pool.config") public JedisPoolConfig jedisPoolConfig(@Value("${jedis.pool.config.maxTotal}") int maxTotal, @Value("${jedis.pool.config.maxWaitMillis}") int maxWaitMillis, @Value("${jedis.pool.config.maxIdle}") int maxIdle) { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(maxTotal); config.setMaxIdle(maxIdle); config.setMaxWaitMillis(maxWaitMillis); return config; } }
/** * 自定义Redis服务 * * * @date 2017/10/19 */ public interface RedisLock { String OK_CODE = "OK"; String OK_MULTI_CODE = "+OK"; /** * 加锁 * * @param lockKey 锁key * @param millseconds 过期时间(毫秒) * @return true:成功获取锁;false:没有获取到锁 */ boolean lock(final String lockKey, final int millseconds); /** * 解锁 * * @param key 锁key * @return true:成功解锁; */ boolean unlock(String key); default boolean isStatusOk(String status) { return (status != null) && (OK_CODE.equals(status) || OK_MULTI_CODE.equals(status)); } }
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import lombok.extern.slf4j.Slf4j; import redis.clients.jedis.JedisCluster; /** * 自定义Redis服务 * * * @date 2017/10/19 */ @Service @Slf4j public class DefaultRedisLock implements RedisLock { @Autowired private JedisCluster redisService; @Override public boolean lock(String lockKey, int millseconds) { return isStatusOk(redisService.set(lockKey, "1", "NX", "PX", millseconds)); } @Override public boolean unlock(String lockKey) { return redisService.del(lockKey) == 1; } }
4.添加辅助类
import com.google.common.collect.Lists; import java.beans.IntrospectionException; import java.beans.PropertyDescriptor; import java.lang.reflect.Array; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; /** * Object帮助类 */ public class ObjectUtils { public static Object getValue(Object object, String propertyName) throws IntrospectionException, IllegalAccessException, InvocationTargetException { Class aClass = object.getClass(); if (isBaseClassOrString(aClass)) { return object.toString(); } if (isArrayOrList(aClass)) { return object.toString(); } if (isMap(aClass)) { return ((Map) object).getOrDefault(propertyName, "null"); } Field[] fields = aClass.getDeclaredFields(); for (Field f : fields) { if (!f.getName().equals(propertyName)) { continue; } PropertyDescriptor descriptor = new PropertyDescriptor(f.getName(), aClass); Method readMethod = descriptor.getReadMethod(); Object result = readMethod.invoke(object); return result; } return "null"; } public static String getPropertiesValue(Object object, List<String> propertyNames) throws IntrospectionException, IllegalAccessException, InvocationTargetException { return getPropertiesValue(object, propertyNames, "_"); } public static String getPropertiesValue(Object object, List<String> propertyNames, String delimiter) throws IntrospectionException, IllegalAccessException, InvocationTargetException { Function<List<Object>, String> joinFunction = list -> list.stream().map(p -> p == null ? "null" : p.toString()).collect(Collectors.joining(delimiter)); return getPropertiesValue(object, propertyNames, joinFunction); } public static String getPropertiesValue(Object object, List<String> propertyNames, Function<List<Object>, String> joinFunction) throws IntrospectionException, IllegalAccessException, InvocationTargetException { List<Object> objects = Lists.newArrayList(); for (String p : propertyNames) { Object result = getValue(object, p); objects.add(result); } return joinFunction.apply(objects); } private static boolean isBaseClassOrString(Class aClass) { return (aClass == String.class) || (aClass == Byte.class) || (aClass == Short.class) || (aClass == Integer.class) || (aClass == Double.class) || (aClass == Long.class) || (aClass == Boolean.class) || (aClass == Float.class) || (aClass == java.lang.Character.class); } private static boolean isArrayOrList(Object obj) { return obj instanceof Array || obj instanceof List; } private static boolean isMap(Object obj) { return obj instanceof Map; } }
5.添加 启动代码及业务代码 和配置项
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.SpringApplication; import org.springframework.boot.web.servlet.ServletComponentScan; /** * 程序入口 */ @SpringBootApplication public class App { public static void main(String[] args) { SpringApplication.run(App.class, args); } }
import com.g2.order.server.annotation.MergeDuplicationRequestAttribute; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import io.swagger.annotations.Api; @Api(value = "H5Controller", description = "H5接口") @RestController @RequestMapping("/h5") public class H5Controller { private static Logger logger = LoggerFactory.getLogger(H5Controller.class); @MergeDuplicationRequestAttribute(redisLockKeyTemplate = "A:%s", redisLockKeyObjectFileds = {"code"}) @RequestMapping(value = "/{code}.jsonp",method = RequestMethod.GET) public Object testJsonp2(@PathVariable("code") String code) { try { Thread.sleep(1000); }catch (Exception ex){ } return "234"; } }
server.port=88 redis.host.address=127.0.0.1:6389,127.0.0.1:6479,127.0.0.1:6579 redis.password=****** jedis.pool.config.maxTotal=100 jedis.pool.config.maxIdle=10 jedis.pool.config.maxWaitMillis=100000
6.测试结果如下:
当快速刷新两个浏览器 访问相同的地址 http://127.0.0.1:88/h5/123.jsonp 时,产生如下日志
2019-04-19 18:06:34.458 INFO 15080 --- [p-nio-88-exec-2] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第1次休眠 2019-04-19 18:06:34.479 INFO 15080 --- [p-nio-88-exec-2] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第2次休眠 2019-04-19 18:06:34.500 INFO 15080 --- [p-nio-88-exec-2] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第3次休眠 2019-04-19 18:06:56.061 INFO 15080 --- [p-nio-88-exec-5] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第1次休眠 2019-04-19 18:06:56.082 INFO 15080 --- [p-nio-88-exec-5] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第2次休眠 2019-04-19 18:06:56.103 INFO 15080 --- [p-nio-88-exec-5] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第3次休眠
7.改进方向
在重试等待中,应当允许去访问另一个方法来获取缓存数据,如果成功立即中断返回.如果不成功才继续竞争锁
伪代码可能如下:
注解里增加 字段 getCacheFunction ="abc"
在切面里 调用
Object theInstance=proceedingJoinPoint.getTarget();
Object cacheValueObj=MethodHelper.invoke(theInstance,getCacheFunction ,args)...
以上是关于合并相同请求的主要内容,如果未能解决你的问题,请参考以下文章