合并相同请求

Posted zhshlimi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了合并相同请求相关的知识,希望对你有一定的参考价值。

在微服务里经常有并发相同的请求过来,当未命中缓存时,可能多条请求一起穿透缓存到DB,这就导致DB压力的增大

本文使用redis的分布式锁来合并相同的请求

当两个以上相同的请求来请求时, 通过竞争实现将相同的请求线性化.

 

假设缓存失效,也只有1个线程去访问DB,其他线程在等待和重试 来降低缓存穿透的风险.

 

代码如下

1.添加依赖

   <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
            <version>1.5.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>1.5.2.RELEASE</version>
        </dependency>

<dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjrt</artifactId>
            <version>1.6.11</version>
        </dependency>

        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>1.6.11</version>
        </dependency>
        <!-- cglib -->
        <dependency>
            <groupId>cglib</groupId>
            <artifactId>cglib</artifactId>
            <version>2.1</version>
        </dependency>

        <dependency>
            <groupId>commons-configuration</groupId>
            <artifactId>commons-configuration</artifactId>
            <version>1.10</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

 

2.添加注解

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * 合并请求的注解
 * 仅支持单参数的接口方法
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MergeDuplicationRequestAttribute {
    /**
     * 分布式锁的key字符串 如 A:b:%s
     * @return
     */
    String redisLockKeyTemplate() default "";

    /**
     * 分布式锁的key组合项, 如id
     * 通过 String.format(redisLockKeyTemplate,getValues(object,fields)) 获取真实的分布式锁的key
     * @return
     */
    String[] redisLockKeyObjectFileds() default {};

    /**
     * 分布式锁的过期时间(毫秒)
     * @return
     */
    int expireMillseconds() default 1000;

    /**
     * 分布式锁的重试间隔(毫秒)
     * @return
     */
    int retryIntervalMillseconds() default 20;

    /**
     * 分布式锁的重试次数
     * @return
     */
    int retryTimes() default 3;
}

 

3.添加切面

import com.g2.order.server.annotation.MergeDuplicationRequestAttribute;
import com.g2.order.server.config.RedisLock;
import com.g2.order.server.utils.ObjectUtils;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.Arrays;

//开启AspectJ 自动代理模式,如果不填proxyTargetClass=true,默认为false,
@EnableAspectJAutoProxy(proxyTargetClass = true)
@Component
@Order(-1)
@Aspect
public class MergeDuplicationRequestAspect {
    private static Logger logger = LoggerFactory.getLogger(MergeDuplicationRequestAspect.class);

    @Autowired
    private RedisLock redisLock;

    @Pointcut("@annotation(com.g2.order.server.annotation.MergeDuplicationRequestAttribute)")
    public void mergeDuplicationRequest() {

    }

    @Around("mergeDuplicationRequest()")
    public Object handleControllerMethod(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        //获取controller对应的方法.
        MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature();

        //获取方法
        Method method = methodSignature.getMethod();

        MergeDuplicationRequestAttribute annotation = method.getAnnotation(MergeDuplicationRequestAttribute.class);
        String key = annotation.redisLockKeyTemplate();
        String redisKey = key;
        Object[] args = proceedingJoinPoint.getArgs();
        if (args.length > 0) {
            Object param = args[0];
            String[] paramFields = annotation.redisLockKeyObjectFileds();
            if (paramFields.length > 0) {
                String propertiesValue = ObjectUtils.getPropertiesValue(param, Arrays.asList(paramFields));
                redisKey = String.format(key, propertiesValue);
            }
        }

        int retryIntervalMillseconds = annotation.retryIntervalMillseconds();
        int retryTimes = annotation.retryTimes();
        int expireMillseconds = annotation.expireMillseconds();
        if (!redisLock.lock(redisKey, expireMillseconds)) {
            for (int i = 1; i <= retryTimes; i++) {
                try {
                    logger.info("有相同的请求,第{}次休眠",i);
                    Thread.sleep(retryIntervalMillseconds);
                } catch (InterruptedException ex) {

                }

                if (redisLock.lock(redisKey, expireMillseconds)) {
                    break;
                }
            }
        }

        return proceedingJoinPoint.proceed();
    }
}

 

3.添加分布式锁代码(使用redis集群)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;

import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;

/**
 * 自定义Redis配置类
 *
 *  
 * @date 2017/10/19
 */
@Slf4j
@Configuration
public class RedisConfig {

    /**
     * jedisCluster
     */
    @Bean
    @Autowired
    public JedisCluster jedisCluster(@Qualifier("jedis.pool.config") JedisPoolConfig config,
            @Value("${redis.host.address}") String hostAndPort,
            @Value("${redis.password}") String password) {
        log.info("开始初始化redis...");
        /**
         * 1 先检查redis集群是否已经配置
         */
        if (StringUtils.isEmpty(hostAndPort)) {
            throw new RuntimeException("Redis 集群初始化异常。请检查配置redis.host.address配置项");
        }

        /**
         * 2 根据配置构建hostAndPorts
         */
        Set<HostAndPort> hostAndPorts = Arrays.asList(hostAndPort.split(",")).stream().map(s -> {
            String[] split = s.split(":");
            return new HostAndPort(split[0], Integer.valueOf(split[1]));
        }).collect(Collectors.toSet());

        return new JedisCluster(hostAndPorts, 1000, 1000, 1, password, config);
    }

    @Bean(name = "jedis.pool.config")
    public JedisPoolConfig jedisPoolConfig(@Value("${jedis.pool.config.maxTotal}") int maxTotal,
            @Value("${jedis.pool.config.maxWaitMillis}") int maxWaitMillis,
            @Value("${jedis.pool.config.maxIdle}") int maxIdle) {
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(maxTotal);
        config.setMaxIdle(maxIdle);
        config.setMaxWaitMillis(maxWaitMillis);
        return config;
    }
}

 

/**
 * 自定义Redis服务
 *
 * 
 * @date 2017/10/19
 */
public interface RedisLock {
    String OK_CODE = "OK";
    String OK_MULTI_CODE = "+OK";

    /**
     * 加锁
     *
     * @param lockKey 锁key
     * @param millseconds 过期时间(毫秒)
     * @return true:成功获取锁;false:没有获取到锁
     */
    boolean lock(final String lockKey, final int millseconds);

    /**
     * 解锁
     *
     * @param key 锁key
     * @return true:成功解锁;
     */
    boolean unlock(String key);

    default boolean isStatusOk(String status) {
        return (status != null) && (OK_CODE.equals(status) || OK_MULTI_CODE.equals(status));
    }
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.JedisCluster;

/**
 * 自定义Redis服务
 *
 * 
 * @date 2017/10/19
 */
@Service
@Slf4j
public class DefaultRedisLock implements RedisLock {

    @Autowired
    private JedisCluster redisService;

    @Override
    public boolean lock(String lockKey, int millseconds) {
        return isStatusOk(redisService.set(lockKey, "1", "NX", "PX", millseconds));
    }

    @Override
    public boolean unlock(String lockKey) {
        return redisService.del(lockKey) == 1;
    }
}

 

4.添加辅助类

import com.google.common.collect.Lists;
import java.beans.IntrospectionException;
import java.beans.PropertyDescriptor;
import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 *  Object帮助类
 */
public class ObjectUtils {

    public static Object getValue(Object object, String propertyName)
            throws IntrospectionException, IllegalAccessException, InvocationTargetException {
        Class aClass = object.getClass();
        if (isBaseClassOrString(aClass)) {
            return object.toString();
        }

        if (isArrayOrList(aClass)) {
            return object.toString();
        }

        if (isMap(aClass)) {
            return ((Map) object).getOrDefault(propertyName, "null");
        }

        Field[] fields = aClass.getDeclaredFields();

        for (Field f : fields) {
            if (!f.getName().equals(propertyName)) {
                continue;
            }

            PropertyDescriptor descriptor = new PropertyDescriptor(f.getName(), aClass);
            Method readMethod = descriptor.getReadMethod();
            Object result = readMethod.invoke(object);
            return result;
        }

        return "null";
    }

    public static String getPropertiesValue(Object object, List<String> propertyNames)
            throws IntrospectionException, IllegalAccessException, InvocationTargetException {
        return getPropertiesValue(object, propertyNames, "_");
    }

    public static String getPropertiesValue(Object object, List<String> propertyNames, String delimiter)
            throws IntrospectionException, IllegalAccessException, InvocationTargetException {

        Function<List<Object>, String> joinFunction = list ->
                list.stream().map(p -> p == null ? "null" : p.toString()).collect(Collectors.joining(delimiter));

        return getPropertiesValue(object, propertyNames, joinFunction);
    }

    public static String getPropertiesValue(Object object, List<String> propertyNames, Function<List<Object>, String>
            joinFunction)
            throws IntrospectionException, IllegalAccessException, InvocationTargetException {
        List<Object> objects = Lists.newArrayList();
        for (String p : propertyNames) {
            Object result = getValue(object, p);
            objects.add(result);
        }

        return joinFunction.apply(objects);
    }

    private static boolean isBaseClassOrString(Class aClass) {
        return (aClass == String.class)
                || (aClass == Byte.class)
                || (aClass == Short.class)
                || (aClass == Integer.class)
                || (aClass == Double.class)
                || (aClass == Long.class)
                || (aClass == Boolean.class)
                || (aClass == Float.class)
                || (aClass == java.lang.Character.class);
    }

    private static boolean isArrayOrList(Object obj) {
        return obj instanceof Array
                || obj instanceof List;
    }

    private static boolean isMap(Object obj) {
        return obj instanceof Map;
    }
}

 

5.添加 启动代码及业务代码 和配置项

import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;


/**
 * 程序入口
 */
@SpringBootApplication
public class App {
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}
import com.g2.order.server.annotation.MergeDuplicationRequestAttribute;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import io.swagger.annotations.Api;

@Api(value = "H5Controller", description = "H5接口")
@RestController
@RequestMapping("/h5")
public class H5Controller {
    private static Logger logger = LoggerFactory.getLogger(H5Controller.class);

    @MergeDuplicationRequestAttribute(redisLockKeyTemplate = "A:%s", redisLockKeyObjectFileds = {"code"})
    @RequestMapping(value = "/{code}.jsonp",method = RequestMethod.GET)
    public Object testJsonp2(@PathVariable("code") String code) {
        try {
            Thread.sleep(1000);
        }catch (Exception ex){

        }
        return "234";
    }
}
server.port=88
redis.host.address=127.0.0.1:6389,127.0.0.1:6479,127.0.0.1:6579
redis.password=******
jedis.pool.config.maxTotal=100
jedis.pool.config.maxIdle=10
jedis.pool.config.maxWaitMillis=100000

 

6.测试结果如下:

当快速刷新两个浏览器 访问相同的地址 http://127.0.0.1:88/h5/123.jsonp 时,产生如下日志

2019-04-19 18:06:34.458  INFO 15080 --- [p-nio-88-exec-2] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第1次休眠
2019-04-19 18:06:34.479  INFO 15080 --- [p-nio-88-exec-2] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第2次休眠
2019-04-19 18:06:34.500  INFO 15080 --- [p-nio-88-exec-2] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第3次休眠
2019-04-19 18:06:56.061  INFO 15080 --- [p-nio-88-exec-5] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第1次休眠
2019-04-19 18:06:56.082  INFO 15080 --- [p-nio-88-exec-5] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第2次休眠
2019-04-19 18:06:56.103  INFO 15080 --- [p-nio-88-exec-5] .s.a.MergeDuplicationRequestAspectConfig : 有相同的请求,第3次休眠

 

7.改进方向

在重试等待中,应当允许去访问另一个方法来获取缓存数据,如果成功立即中断返回.如果不成功才继续竞争锁

伪代码可能如下:

注解里增加 字段 getCacheFunction ="abc"

在切面里 调用

Object theInstance=proceedingJoinPoint.getTarget();

  Object cacheValueObj=MethodHelper.invoke(theInstance,getCacheFunction ,args)...

 

以上是关于合并相同请求的主要内容,如果未能解决你的问题,请参考以下文章

我如何从多个相同的获取请求中连接/合并多个 json?

合并排序相同的输出 C++

RestKit / CoreData:两次请求相同的URL时插入重复的对象而不是合并

gitlab 权限说明

Python代码阅读(第19篇):合并多个字典

VSCode自定义代码片段14——Vue的axios网络请求封装