高并发之限流实现
Posted xiaozhuanfeng
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发之限流实现相关的知识,希望对你有一定的参考价值。
本次样例从单机层面上,采用拦截器的方式对请求限流。
资源:https://github.com/xiaozhuanfeng/rateLimiterProj
工程结构:
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- 热启动:项目修改,即时生效 完整的打包环境下运行的时候会被禁用 java -jar启动应用或者用一个特定的classloader启动 认为生产环境 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.jetbrains</groupId> <artifactId>annotations</artifactId> <version>RELEASE</version> <scope>compile</scope> </dependency> <!-- 引入guava 包--> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>23.0</version> </dependency> <!-- 引入fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.56</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
application.properties
#端口号设置
server.port=8007
#日志的输出路径
logging.path=/logs/rateLimiterProj
logging.config=classpath:logback-spring.xml
#自定义限流方式,默认当请求超过指定QPS,放弃请求
rateLimit.qps = 1
rateLimit.type=acquire
rateLimit.tryAcquire.permits=1
rateLimit.tryAcquire.timeOut=100
1、新建抽象拦截器
package com.example.demo.interceptor; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.util.IOUtils; import com.example.demo.constant.ResponseEnum; import com.example.demo.dto.ResponseDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.MediaType; import org.springframework.web.servlet.handler.HandlerInterceptorAdapter; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.PrintWriter; public abstract class AbstractInterceptor extends HandlerInterceptorAdapter { public final Logger log = LoggerFactory.getLogger(this.getClass()); /** * 具体拦截方法 * * @param req * @return */ protected abstract ResponseEnum handleRequest(HttpServletRequest req) throws Exception; @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { ResponseEnum result = ResponseEnum.SERVER_ERROR; try { result = handleRequest(request); } catch (Exception e) { log.error("handleRequest catch an Exception>>>", e); } if (ResponseEnum.OK == result) { //成功 return true; } handleFailResponse(response, result); return false; } public void handleFailResponse(HttpServletResponse resp, ResponseEnum result) { resp.setStatus(HttpServletResponse.SC_OK); resp.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE); ResponseDTO dto = new ResponseDTO(); dto.setCode(result.getCode()); dto.setMesg(result.getMesg()); PrintWriter pw = null; try { pw = resp.getWriter(); pw.write(JSON.toJSONString(dto)); } catch (IOException e) { log.error("handleFailResponse catch an IOException>>>", e); } finally { IOUtils.close(pw); } } }
2、新建RateLimiter Bean
package com.example.demo.configuration; import com.google.common.util.concurrent.RateLimiter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RateLimitConfig { @Value("${rateLimit.qps}") private double qps; @Bean public RateLimiter getRateLimiter() { if(0 == this.qps){ this.qps = 1.0D; } return RateLimiter.create(qps); } }
3、新建返回枚举类和响应实体类
package com.example.demo.constant; /** * 自定义响应码 */ public enum ResponseEnum { OK(200, "成功"), RATE_LIMIT(403, "访问次数受限"), AUTHENTICATION_FAIL(401, "未授权"), SERVER_ERROR(500, "服务器错误"); private int code; private String mesg; ResponseEnum(int code, String mesg) { this.code = code; this.mesg = mesg; } public int getCode() { return code; } public void setCode(int code) { this.code = code; } public String getMesg() { return mesg; } public void setMesg(String mesg) { this.mesg = mesg; } @Override public String toString() { return "ResponseEnum{" + "code=" + code + ", mesg=‘" + mesg + ‘‘‘ + ‘}‘; } }
package com.example.demo.dto; public class ResponseDTO { private int code; private String mesg; public ResponseDTO() { } public ResponseDTO(int code, String mesg) { this.code = code; this.mesg = mesg; } public int getCode() { return code; } public void setCode(int code) { this.code = code; } public String getMesg() { return mesg; } public void setMesg(String mesg) { this.mesg = mesg; } @Override public String toString() { return "ResponseDTO{" + "code=" + code + ", mesg=‘" + mesg + ‘‘‘ + ‘}‘; } }
4、创建请求拦截器
package com.example.demo.interceptor; import com.example.demo.constant.ResponseEnum; import com.google.common.util.concurrent.RateLimiter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import javax.servlet.http.HttpServletRequest; import java.util.concurrent.TimeUnit; @Component("rateLimitInterceptor") public class RateLimitInterceptor extends AbstractInterceptor{ private static final String RATELIMIT_TYPE_ACQUIRE = "acquire"; @Value("${rateLimit.type}") private String rateLimitType; @Value("${rateLimit.tryAcquire.permits}") private int permits; @Value("${rateLimit.tryAcquire.timeOut}") private long timeout; @Autowired private RateLimiter rateLimiter; @Override protected ResponseEnum handleRequest(HttpServletRequest req) throws Exception { if(RATELIMIT_TYPE_ACQUIRE.equals(rateLimitType)){ return passWaitReq(req); }else{ return giveUpReqWhenExceedLimit(req); } } /** * 阻塞线程直到请求可以再授予许可 * @return */ private ResponseEnum passWaitReq(HttpServletRequest req){ //permits,默认1,从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求数 double waitTime = rateLimiter.acquire(); log.info("请求成功......waitTime="+waitTime); return ResponseEnum.OK; } /** * 判断是否可以立即获取许可,否则放弃请求 * @param req * @return */ private ResponseEnum giveUpReqWhenExceedLimit(HttpServletRequest req){ //timeout 时间内获取令牌,默认timeout=0L,如果可以则挂起等待相应时间并返回true,否则立即返回false if(0 == permits || 0L == timeout){ if(!rateLimiter.tryAcquire()){ log.warn("限流中......"); return ResponseEnum.RATE_LIMIT; } }else{ if(!rateLimiter.tryAcquire(permits,timeout, TimeUnit.MICROSECONDS)){ log.warn("permits="+permits+",timeout="+timeout+",限流中......"); return ResponseEnum.RATE_LIMIT; } } log.info("请求成功......"); return ResponseEnum.OK; } }
5、创建Controller,测试用
package com.example.demo.controller; import org.apache.juli.logging.LogFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @RestController public class HelloController { private final Logger log = LoggerFactory.getLogger(this.getClass()); @RequestMapping(value="/rateLimit/getHello",method = RequestMethod.GET) public String getHello(){ log.info("rateLimit Hello ........"); try { //发呆1s Thread.sleep(100); } catch (InterruptedException e) { log.error("getHello catch an InterruptedException>>",e); } return "Get:Hello,World"; } @RequestMapping(value="/calculate/getHello",method = RequestMethod.GET) public String postHello(){ log.info("calculate Hello ........"); try { //发呆1s Thread.sleep(1000); } catch (InterruptedException e) { log.error("getHello catch an InterruptedException>>",e); } return "Post:Hello,World"; } }
6、配置请求拦截器
package com.example.demo.configuration; import com.example.demo.interceptor.CalculateReqInterceptor; import com.example.demo.interceptor.RateLimitInterceptor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.InterceptorRegistration; import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport; @Configuration public class WebInterceptorConfig extends WebMvcConfigurationSupport { @Bean public RateLimitInterceptor getRateLimitInterceptor() { return new RateLimitInterceptor(); } @Bean public CalculateReqInterceptor getCalculateReqInterceptor() { return new CalculateReqInterceptor(); } @Override protected void addInterceptors(InterceptorRegistry registry) { InterceptorRegistration rateLimitIcpt = registry.addInterceptor(getRateLimitInterceptor()); // 拦截配置 rateLimitIcpt.addPathPatterns("/rateLimit/**"); //需要计算请求时间的请求 //InterceptorRegistration calculateIcpt = registry.addInterceptor(getCalculateReqInterceptor()); //calculateIcpt.addPathPatterns("/calculate/**"); } }
测试:
@Test public void mulitplyReq() throws Exception { String url = "/rateLimit/getHello"; //String url = "/calculate/getHello"; for(int i = 0;i < 10;i++){ sendReq(i,url); } } private void sendReq(int batchNo,String url) throws Exception { /** * 1、mockMvc.perform执行一个请求。 * 2、MockMvcRequestBuilders.get("XXX")构造一个请求。 * 3、ResultActions.param添加请求传值 * 4、ResultActions.accept(MediaType.TEXT_html_VALUE))设置返回类型 * 5、ResultActions.andExpect添加执行完成后的断言。 * 6、ResultActions.andDo添加一个结果处理器,表示要对结果做点什么事情 * 比如此处使用MockMvcResultHandlers.print()输出整个响应结果信息。 * 5、ResultActions.andReturn表示执行完成后返回相应的结果。 */ MvcResult mvcResult = mockMvc.perform(MockMvcRequestBuilders.get(url) .accept(MediaType.TEXT_HTML_VALUE)) // .andExpect(MockMvcResultMatchers.status().isOk()) //等同于Assert.assertEquals(200,status); // .andExpect(MockMvcResultMatchers.content().string("hello lvgang")) //等同于 Assert.assertEquals // ("hello lvgang",content); //.andDo(MockMvcResultHandlers.print())//打印 .andReturn(); MockHttpServletResponse result = mvcResult.getResponse(); System.out.println("Thread"+Thread.currentThread().getName()+">>>batchNo="+batchNo+":"+result.getContentAsString()); }
结果:
15:31:43.428 [main] INFO c.e.d.i.RateLimitInterceptor - 请求成功......waitTime=0.0
15:31:43.435 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........
Threadmain>>>batchNo=0:Get:Hello,World
15:31:43.629 [main] INFO c.e.d.i.RateLimitInterceptor - 请求成功......waitTime=0.0
15:31:43.629 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........
Threadmain>>>batchNo=1:Get:Hello,World
15:31:44.545 [main] INFO c.e.d.i.RateLimitInterceptor - 请求成功......waitTime=0.812705
15:31:44.545 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........
Threadmain>>>batchNo=2:Get:Hello,World
15:31:45.544 [main] INFO c.e.d.i.RateLimitInterceptor - 请求成功......waitTime=0.89766
15:31:45.544 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........
...
切换下rateLimit.type=tryAcquire,qps=10 ,qps一定要设置恰当,否则用户体验将不太每秒。运行:
@Test public void threadReq() throws Exception { String url = "/rateLimit/getHello"; new Thread(()->{ for(int i = 0;i < 500;i++){ try { sendReq(i,url); } catch (Exception e) { } } }).start(); new Thread(()->{ for(int i = 0;i < 500;i++){ try { sendReq(i,url); } catch (Exception e) { } } }).start(); while(true){ Thread.sleep(60 * 1000); break; } }
下面简单介绍其他应用级限流:
(1)限流总并发/连接/请求数
对于一个应用系统来说一定会有极限并发/请求数,即总有一个TPS/QPS阀值,如果超了阀值则系统就会不响应用户请求或响应的非常慢,因此我们最好进行过载保护,防止大量请求涌入击垮系统。
如果你使用过Tomcat,其Connector 其中一种配置有如下几个参数:
acceptCount:如果Tomcat的线程都忙于响应,新来的连接会进入队列排队,如果超出排队大小,则拒绝连接;
maxConnections: 瞬时最大连接数,超出的会排队等待;
maxThreads:Tomcat能启动用来处理请求的最大线程数,如果请求处理量一直远远大于最大线程数则可能会僵死。
详细的配置请参考官方文档。另外如mysql(如max_connections)、Redis(如tcp-backlog)都会有类似的限制连接数的配置。
Springboot内置tomcat属性在org.springframework.boot.autoconfigure.web.ServerProperties,可在application.properties配置属性。
Tomcat特有配置都以”server.tomcat”作为前缀
如上面的:
#端口号设置
server.port=8007
(2)限流总资源数
如果有的资源是稀缺资源(如数据库连接、线程),而且可能有多个系统都会去使用它,那么需要限制应用;可以使用池化技术来限制总资源数:连接池、线程池。比如分配给每个应用的数据库连接是100,那么本应用最多可以使用100个资源,超出了可以等待或者抛异常。
(3)限流某个接口的总并发/请求数
package com.example.demo.interceptor; import com.example.demo.constant.ResponseEnum; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import javax.servlet.http.HttpServletRequest; import java.util.concurrent.atomic.AtomicLong; @Service("limitReqNumInterceptor") public class LimitReqNumInterceptor extends AbstractInterceptor{ @Autowired private AtomicLong atomic; private static final long LIMIT_NUM = 10L; @Override protected ResponseEnum handleRequest(HttpServletRequest req) throws Exception { //AtomicLong 对请求数限流 注意:非线程安全(理论上超出限制是可以接受的),请求定制需要在拦截器配置,将会非常多,不好用 if(atomic.incrementAndGet() > LIMIT_NUM) { //拒绝请求 log.warn("超出请求数>>>,LIMIT_NUM="+LIMIT_NUM); return ResponseEnum.RATE_LIMIT; } log.warn("请求通过...."); return ResponseEnum.OK; } //@Scheduled(cron="*/6 * * * * ?") /* private void process(){ if(atomic.get() > LIMIT_NUM) { log.warn("请求达到上限,定时清理请求数>>>"+atomic.get()); //启动定时任务,定时清空请求数 atomic.set(0L); log.warn("定时清理后>>>"+atomic.get()); } }*/ }
限流某个接口的时间窗请求数
利用guava catche,缓存uri路径的方式
/** * 过期时间 */ private static final long EXPIRE_SEC = 20L; /** * 定制 过期时间 */ private LoadingCache<String, AtomicLong> LoadingCache = CacheBuilder.newBuilder().expireAfterWrite(EXPIRE_SEC, TimeUnit.SECONDS).build(new CacheLoader<String, AtomicLong>() { //本地缓存没有命中时,调用load,并将结果缓存 @Override public AtomicLong load(String aLong) throws Exception { return new AtomicLong(0L); } }); @Override protected ResponseEnum handleRequest(HttpServletRequest req) throws Exception { String uri = req.getRequestURI(); AtomicLong atomic = LoadingCache.get(uri); System.out.println("uri="+uri+",当前请求数,number="+atomic.get()); if(atomic.incrementAndGet() > LIMIT_NUM) { //拒绝请求 log.warn("uri="+uri+"超出请求数>>>,LIMIT_NUM="+LIMIT_NUM); return ResponseEnum.RATE_LIMIT; } log.warn("请求通过...."); return ResponseEnum.OK; }
Controller,拦截器设置略
@RequestMapping(value="/numLimit/getHello",method = RequestMethod.GET) public String sayHello(){ log.info("numLimit Hello ........"); try { //发呆1s Thread.sleep(1000); } catch (InterruptedException e) { log.error("getHello catch an InterruptedException>>",e); } return "Say:Hello,World"; } @RequestMapping(value="/numLimit/getHello2",method = RequestMethod.GET) public String sayHello2(){ log.info("numLimit Hello2 ........"); try { //发呆1s Thread.sleep(1000); } catch (InterruptedException e) { log.error("getHello catch an InterruptedException>>",e); } return "Say:Hello,World"; }
测试:
@Test public void mulitplyReq2() throws Exception { String url = "/numLimit/getHello"; //String url = "/calculate/getHello"; for (int i = 0; i < 100; i++) { sendReq(i, url); } String url2 = "/numLimit/getHello2"; for (int i = 0; i < 15; i++) { sendReq(i, url2); } Thread.sleep(15 * 1000L); for (int i = 0; i < 15; i++) { sendReq(i, url); } }
参考:
https://www.cnblogs.com/xuwc/p/9123078.html
以上是关于高并发之限流实现的主要内容,如果未能解决你的问题,请参考以下文章