限流实现与解决方案
Posted cuiqq
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了限流实现与解决方案相关的知识,希望对你有一定的参考价值。
https://blog.csdn.net/qq_32447301/article/details/86659474
一、限流操作:
为什么限流,是防止用户恶意刷新接口,因为部署在外部服务器,并且我们采用websocket的接口实现的,公司没有对硬件升级,导致程序时长崩溃,为了解决这个问题,请教公司的大佬,提出一个方案,限流操作。但是最后找到原因所在,解决了,吞吐量1万6左右,用的测试服务器,进行测试的,我开发的笔记本进行压测,工具是Jmeter,结果我的电脑未响应,卡了,服务器还没有挂。
限流那些方法
常见的限流:
1、Netflix的hystrix
2、阿里系开源的sentinel
3、说白了限流,为了处理高并发接口那些方式:队列,线程,线程池,消息队列、 kafka、中间件、sentinel:直接拒绝、Warm Up、匀速排队等
技术层面:
1、判断是否有相同的请求,可以通过自身缓存挡住相同请求
2、用负载均衡,比如nginx
3、用缓存数据库,把热点数据get到缓存中,redis,ES
4、善于使用连接池
业务层面:
1、加入交互,排队等待
二、应用级别限流与限流实现:
方法一、使用google的guava,令牌桶算法实现:平滑突发限流 ( SmoothBursty) 、平滑预热限流 ( SmoothWarmingUp) 实现
<!--Java项目广泛依赖 的核心库-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.0</version>
</dependency>
package com.citydo.dialogue.controller;
import com.google.common.util.concurrent.RateLimiter;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.Collections;
@RestController
public class HomeController {
// 这里的1表示每秒允许处理的量为10个
private RateLimiter limiter = RateLimiter.create(10.0);
//RateLimiter.create(doublepermitsPerSecond, long warmupPeriod, TimeUnit unit);
//RateLimiter limiter = RateLimiter.create(5, 1000, TimeUnit.MILLISECONDS);
//permitsPerSecond: 表示 每秒新增 的令牌数
// warmupPeriod: 表示在从 冷启动速率 过渡到 平均速率 的时间间隔
@GetMapping("/test/{name}")
public String Test(@PathVariable("name") String name){
// 请求RateLimiter, 超过permits会被阻塞
final double acquire = limiter.acquire();
System.out.println("--------"+acquire);
//判断double是否为空或者为0
if(acquire>=(-1e-6)&&acquire<=(1e-6)){
return name;
}else{
return "操作太频繁";
}
}
}
这个有点类似与QPS流量控制:
当 QPS 超过某个阈值的时候,则采取措施进行流量控制。
直接拒绝:
方式是默认的流量控制方式,当QPS超过任意规则的阈值后,新的请求就会被立即拒绝,拒绝方式为抛出Exception或者返回值404。这种方式适用于对系统处理能力确切已知的情况下,比如通过压测确定了系统的准确水位时。
方法二、请求一次redis增加1,key可以是IP+时间或者一个标识+时间,没有就创建,需要设置过期时间
设置拦截器:
package com.citydo.dialogue.config; import com.citydo.dialogue.service.AccessLimitInterceptor; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.ViewControllerRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport; /** * 拦截器配置 * @author nick */ @Configuration public class InterceptorConfig extends WebMvcConfigurationSupport { @Override public void addInterceptors(InterceptorRegistry registry) { //addPathPatterns 添加拦截规则 registry.addInterceptor(new AccessLimitInterceptor()) //添加需要拦截请求的路径 .addPathPatterns("/**"); //swagger2 放行 .excludePathPatterns("/swagger-resources/**", "/webjars/**", "/v2/**", "/swagger-ui.html/**"); //.excludePathPatterns("/*") //去除拦截请求的路径 } @Override public void addViewControllers(ViewControllerRegistry registry) { registry.addViewController("/"); } }
拦截方法
package com.citydo.dialogue.service; import com.citydo.dialogue.entity.AccessLimit; import com.citydo.dialogue.utils.IpUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.web.method.HandlerMethod; import org.springframework.web.servlet.HandlerInterceptor; import org.springframework.web.servlet.ModelAndView; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.lang.reflect.Method; import java.util.concurrent.TimeUnit; public class AccessLimitInterceptor implements HandlerInterceptor { //使用RedisTemplate操作redis @Autowired private RedisTemplate<String, Integer> redisTemplate; @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if (handler instanceof HandlerMethod) { HandlerMethod handlerMethod = (HandlerMethod) handler; Method method = handlerMethod.getMethod(); if (!method.isAnnotationPresent(AccessLimit.class)) { return true; } AccessLimit accessLimit = method.getAnnotation(AccessLimit.class); if (accessLimit == null) { return true; } int limit = accessLimit.limit(); int sec = accessLimit.sec(); String key = IpUtil.getIpAddr(request) + request.getRequestURI(); //资源唯一标识 String formatDate=new SimpleDateFormat("yyyyMMddHHmm").format(new Date()); //String key="request_"+formatDate; Integer maxLimit = redisTemplate.opsForValue().get(key); if (maxLimit == null) { //set时一定要加过期时间 redisTemplate.opsForValue().set(key, 1, sec, TimeUnit.SECONDS); } else if (maxLimit < limit) { redisTemplate.opsForValue().set(key, maxLimit + 1, sec, TimeUnit.SECONDS); } else { output(response, "请求太频繁!"); return false; } } return true; } public void output(HttpServletResponse response, String msg) throws IOException { response.setContentType("application/json;charset=UTF-8"); ServletOutputStream outputStream = null; try { outputStream = response.getOutputStream(); outputStream.write(msg.getBytes("UTF-8")); } catch (IOException e) { e.printStackTrace(); } finally { outputStream.flush(); outputStream.close(); } } @Override public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception { } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { } }
可以设置成注解,当然也可以直接添加参数
package com.citydo.dialogue.entity; import java.lang.annotation.*; @Inherited @Documented @Target({ElementType.FIELD,ElementType.TYPE,ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface AccessLimit { //标识 指定sec时间段内的访问次数限制 int limit() default 5; //标识 时间段 int sec() default 5; }
redis编写配置
package com.citydo.dialogue.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * 解决redis乱码问题 * 解决配置问题 * @author nick */ @Configuration public class RedisConfig { /** * 此方法解决存储乱码 */ @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> template = new RedisTemplate<Object, Object>(); template.setConnectionFactory(redisConnectionFactory); template.setKeySerializer(new StringRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); template.setHashKeySerializer(new GenericJackson2JsonRedisSerializer()); template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer()); template.afterPropertiesSet(); return template; } }
方法三、分布式限流,分布式限流最关键的是要将限流服务做成原子化,而解决方案可以使用redis+lua或者nginx+lua技术进行实现。
方法四、可以使用池化技术来限制总资源数:连接池、线程池。比如分配给每个应用的数据库连接是 100,那么本应用最多可以使用 100 个资源,超出了可以 等待 或者 抛异常。
方法五、限流总并发/连接/请求数,如果你使用过 Tomcat,其 Connector 其中一种配置有如下几个参数:
maxThreads:
Tomcat 能启动用来处理请求的 最大线程数,如果请求处理量一直远远大于最大线程数,可能会僵死。
maxConnections:
瞬时最大连接数,超出的会 排队等待。
acceptCount:
如果 Tomcat 的线程都忙于响应,新来的连接会进入 队列排队,如果 超出排队大小,则 拒绝连接。
方法六、限流某个接口的总并发/请求数,使用 Java 中的 AtomicLong,示意代码:
try{ if(atomic.incrementAndGet() > 限流数) { //拒绝请求 } else { //处理请求 } } finally { atomic.decrementAndGet(); }
方法七、 限流某个接口的时间窗请求数使用 Guava 的 Cache,示意代码:
LoadingCache counter = CacheBuilder.newBuilder()
.expireAfterWrite(2, TimeUnit.SECONDS)
.build(newCacheLoader() {
@Override
public AtomicLong load(Long seconds) throws Exception {
return newAtomicLong(0);
}
});
longlimit =1000;
while(true) {
// 得到当前秒
long currentSeconds = System.currentTimeMillis() /1000;
if(counter.get(currentSeconds).incrementAndGet() > limit) {
System.out.println("限流了: " + currentSeconds);
continue;
}
// 业务处理
不管哪种目的是为了,进行限流操作。
参考:https://blog.csdn.net/fanrenxiang/article/details/80683378
参考:https://mp.weixin.qq.com/s/2_oDGJiI1GhaNYnaeL7Qpg
源码:https://github.com/863473007/springboot_current_limiting
以上是关于限流实现与解决方案的主要内容,如果未能解决你的问题,请参考以下文章