微服务-限流架构设计

Posted 蟋蟀得不像砖家派

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了微服务-限流架构设计相关的知识,希望对你有一定的参考价值。

前言

 


根据以上场景,可以联想到有一个排队机, 先取一个号, 然后等待叫号, 办事窗口多, 号就叫得快, 办事窗口少, 号就叫得慢, 排队机是一个了不起的发明, 这里有许多值得我们在编程时借鉴的东西。 

可以看到的是排号机是一个总调度器,各个窗口是负责独立处理客户业务的线程,每个线程有自己的排号节奏,当处理不过来的时候,排号机可以放慢出号的速度,甚至直接拒绝出号。

回到我们微服务技术来讲, 任何服务都有容量的限制, 为了使我们的服务保持高可用性, 我们必须对系统进行限流, 也称速率限制 Rate Limiting

实现限流技术的方案有很多种,最常用的莫过于消息队列了。

而消息队列有一个明显的特征,就是对于生产者而言,可以肆无忌惮的放队列里进行push消息,

消费方只能按照一定的顺序来按需消费,最终的结果导致队列挤压,前端超时等待。并且整个过程是异步操作,带来了额外的复杂性。

而这微服务讲的限流主要是控制流率,整个请求保持同步,保证后端的处理能力维持在一个理想水平。

服务器端对客户端的请求进行监控,当发觉某个客户端发送了过多或过快的请求就会做出限制, 根据预先制定的策略针对某个客户端的IP, 帐户或类型进行限流, 从而保证了对大多数正常请求的服务不受影响, 防止拒绝服务 DoS (denial of service) 和分布式拒绝服务 DDoS (distributed denial of service) , DoS 是很常见的网络攻击方式, 限流或者说速率控制 Rate Limiting 是行之有效的应对手段。

限流可以是比较暴力方式, 只根据每秒请求数的阈值来进行控制, 超过 QPS/TPS 上限的请求一律拒绝掉, 这种方式 有效, 但是不能精准打击那些攻击者, 反而会误伤无辜。

 

限流的解决方案

>业务解决方案

 用户纬度进行限制

我们在服务器签发的 token 中包含用户信息: userId, orgId, 然后就可以针对 userId orgId 做单独的计数, 如果在特定时间单位中超过最大数量阈值, 则拒绝此特定用户或组织的请求. 实际应用中就可以在 JWT(Json Web Token) 中添加自定义字段来表示用户, 组织及应用程序的标识信息。

 

应用程序层面 Application Level

   漏桶策略

类似用户层面的限流, 一旦我们可以辨别出应用程序的标识, 就可以针对特定应用程序的请求进行计数, 按照下面介绍的限流算法来进行速率控制

就象我们生活中常见的漏斗, 从油桶往油瓶里倒油, 没有漏斗, 除非是卖油翁那样的专家, 多数情况下油都会跑冒滴漏

 

 

漏桶是总容量是不变的, 水滴(请求) 以任意速率流入, 但总是以恒定速率流出, 如果请求来得太多太快, 桶的容量就会撑满, 后续的请求就会被拒绝, 也就是说当一个请求到来, 就流一滴水进桶里,如果可以放入, 则处理此请求, 否则漏桶已满, 则拒绝此请求, 直到桶中水滴不再满时

   令牌桶策略

令牌桶与上而的漏桶异曲同工, 只不过它不是以固定速率流出, 而是以固定速率放入令牌到令牌桶中, 请求到来时从令牌桶中领取一个令牌才可继续处理服务, 如果取不到令牌, 则拒绝此请求

 

技术实现方式

 

>JDK Semaphore 信号量,

控制并发的数量即 控制同时有几个线程同时执行 。

线程在执行前调用Semaphore 的acquire()方法获取线程执行权限,如果信号量里面有余的令牌,则当前线程可以执行,没有的话就等待

线程执行结束则调用release(),释放当前的令牌,其他线程就可以获取这个令牌,继续执行

获取可以的剩余信号令牌:availablePermits() 

代码如下:

/**

 * 作用:Semaphore 信号量,控制并发的数量即 控制同时有几个线程同时执行

 * 场景:前提:一个商店的工人倒班工作,按照工时给工资

 *      商店同时需要5个人同时工作, 多余的其他工人等到这5个工人需要休息了,就顶上继续工作

 */

public class SemaphoreTest {

 

public static void main(String[] args) {

 Semaphore semaphore = new Semaphore(5);

 ExecutorService executor = Executors.newFixedThreadPool(30) ;

 

 for(int i=0;i<10;i++ ){

 executor.execute(  new SlaveThread4(semaphore,  i)); 

 }

 

 executor.shutdown();

}

}

 

//工人类

class SlaveThread4  implements Runnable {

//工人编号

private Integer number ;

//线程开始标识

private Semaphore semaphore;

public SlaveThread4(Semaphore semaphore ,  int number){

this.number = number ;

this.semaphore = semaphore ;

}


@Override

public void run() {

try {

semaphore.acquire();

System.out.println( "工人  "+number+"  开始工作"+new Date());

work() ;

semaphore.release();

System.out.println( "工人  "+number+"  结束工作  ,  剩余空缺人:"+semaphore.availablePermits());

} catch (InterruptedException e) {

e.printStackTrace();

}


}

//工作

public void work(){

try {

//抬起箱子总共花费时间 3s

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

>基于google RateLimiter实现

package cn.yunlai.bus.limit;

import java.io.PrintWriter;

import java.util.List;

import java.util.Map;

import java.util.Map.Entry;

import java.util.Properties;

import java.util.concurrent.ConcurrentHashMap;

import javax.servlet.http.HttpServletRequest;

import javax.servlet.http.HttpServletResponse;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import org.springframework.web.servlet.HandlerInterceptor;

import org.springframework.web.servlet.ModelAndView;

import org.springframework.web.servlet.mvc.condition.PatternsRequestCondition;

import org.springframework.web.util.UrlPathHelper;

 

import com.google.common.base.Joiner;

import com.google.common.util.concurrent.RateLimiter;

 

/**

 * spring请求限流器 可对全局请求或url表达式请求进行限流

 * 内部使用spring DispatcherServlet的匹配器PatternsRequestCondition 进行url的匹配。

 *

 * @author Rain

 * @version 1.0.0

 * @time 2018/10/31

 */

@Component

public class RateLimitInterceptor implements HandlerInterceptor {

 

private static final Logger logger = LoggerFactory.getLogger(RateLimitInterceptor.class);

 

private RateLimiter globalRateLimiter;

 

private Map<PatternsRequestCondition, RateLimiter> urlRateMap;

private UrlPathHelper urlPathHelper;

private int globalRate;

@Autowired

private URLLimitMapping urlLimitMapping;

 

public URLLimitMapping getUrlLimitMapping() {

return urlLimitMapping;

}

 

public void setUrlLimitMapping(URLLimitMapping urlLimitMapping) {

this.urlLimitMapping = urlLimitMapping;

}

 

public RateLimitInterceptor() {

this(0);

}

 

/**

 * @param globalRate

 *            全局请求限制qps

 */

public RateLimitInterceptor(int globalRate) {

urlPathHelper = new UrlPathHelper();

this.globalRate = globalRate;

if (globalRate > 0)

globalRateLimiter = RateLimiter.create(globalRate);

}

 

/**

 * @param globalRate

 *            全局请求限制qps

 */

public void setGlobalRate(int globalRate) {

this.globalRate = globalRate;

if (globalRate > 0)

globalRateLimiter = RateLimiter.create(globalRate);

}

 

/**

 * url限流

 *

 * @param urlProperties

 *            url表达式->qps propertis

 */

public void setUrlMaps(URLLimitMapping limitMappings) {

if (urlRateMap == null)

urlRateMap = new ConcurrentHashMap();

fillRateMap(limitMappings.getLimitMaps(), urlRateMap);

}

 

/**

 * url表达转换为PatternsRequestCondition,并生成对应RateLimiter 保存

 *

 * @param controllerProperties

 * @param map

 */

private void fillRateMap(Properties controllerProperties, Map<PatternsRequestCondition, RateLimiter> map) {

if (controllerProperties != null) {

for (String key : controllerProperties.stringPropertyNames()) {

String value = controllerProperties.getProperty(key);

if (value.matches("[0-9]*[1-9][0-9]*")) {

map.put(new PatternsRequestCondition(key), RateLimiter.create(Double.valueOf(value)));

} else {

logger.error(key + " 的值" + value + " 不是一个合法的限制");

}

}

}

}

 

/**

 * url表达转换为PatternsRequestCondition,并生成对应RateLimiter 保存

 *

 * @param limitMappings

 * @param map

 */

private void fillRateMap(Map<String, Integer> limitMappings, Map<PatternsRequestCondition, RateLimiter> map) {

if (limitMappings != null) {

for (Entry<String, Integer> entry : limitMappings.entrySet()) {

map.put(new PatternsRequestCondition(entry.getKey()),

RateLimiter.create(Double.valueOf(entry.getValue())));

}

 

}

}

@Override

    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {

 

    if(urlRateMap == null){

          this.setUrlMaps(urlLimitMapping);// 加载URL限流配置

          this.setGlobalRate(urlLimitMapping.getGlobalRate());

          this.globalRate=urlLimitMapping.getGlobalRate();

          

    }       

            String lookupPath = urlPathHelper.getLookupPathForRequest(request);

            for (PatternsRequestCondition patternsRequestCondition : urlRateMap.keySet()) {

                //使用spring DispatcherServlet的匹配器PatternsRequestCondition进行匹配

                List<String> matches = patternsRequestCondition.getMatchingPatterns(lookupPath);

                if (!matches.isEmpty()) {

                    if (urlRateMap.get(patternsRequestCondition).tryAcquire()) {

                    logger.warn(lookupPath + " 请求匹配到" + Joiner.on(",").join(patternsRequestCondition.getPatterns()) + "限流器");

                    } else {

                    logger.error(lookupPath + " 请求超过" + Joiner.on(",").join(patternsRequestCondition.getPatterns()) + "限流器速率");

                    response.reset();//设置编码格式

                    response.setCharacterEncoding("UTF-8");

                    response.setContentType("application/json;charset=UTF-8");

                    PrintWriter pw = response.getWriter();

                    pw.write("拦截器拦截,请求超过限流控制");

                    pw.flush();

                        pw.close();

                        return false;

                    }

                }           

        }

 

        //全局限流

        if (globalRateLimiter != null) {

            if (!globalRateLimiter.tryAcquire()) {

            response.reset();//设置编码格式

            response.setCharacterEncoding("UTF-8");

            response.setContentType("application/json;charset=UTF-8");

            PrintWriter pw = response.getWriter();

            pw.write("拦截器拦截,请求超过全局限流控制");

            pw.flush();

                pw.close();

                return false;

            }

        }

        return true;

    }

@Override

public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,

ModelAndView modelAndView) throws Exception {

}

@Override

public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex)

throws Exception {

 

}

}

package cn.yunlai.bus.limit;

import java.util.Map;

import org.springframework.stereotype.Component;

/**

 * @author Rain

 * @version 1.0.0

 * @time 2018/10/31

 */

@Component

public class URLLimitMapping {

private Integer globalRate;

public Integer getGlobalRate() {

return globalRate;

}

public void setGlobalRate(Integer globalRate) {

this.globalRate = globalRate;

}

private Map<String, Integer> limitMaps;

public Map<String, Integer> getLimitMaps() {

return limitMaps;

}

public void setLimitMaps(Map<String, Integer> limitMaps) {

this.limitMaps = limitMaps;

}

 

}

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"

xmlns:context="http://www.springframework.org/schema/context"

xmlns:task="http://www.springframework.org/schema/task" xmlns="http://www.springframework.org/schema/beans"

xsi:schemaLocation="http://www.springframework.org/schema/beans     http://www.springframework.org/schema/beans/spring-beans.xsd

       http://www.springframework.org/schema/aop

       http://www.springframework.org/schema/aop/spring-aop.xsd

       http://www.springframework.org/schema/tx

       http://www.springframework.org/schema/tx/spring-tx.xsd

       http://www.springframework.org/schema/context

       http://www.springframework.org/schema/context/spring-context.xsd

       http://www.springframework.org/schema/task          

   http://www.springframework.org/schema/task/spring-task.xsd">

<bean id="urlLimitMapping" class="cn.yunlai.bus.limit.URLLimitMapping">

        <!-- 全局限流配置 -->

        <property name="globalRate" value="5"></property>

         <!-- 基于不同业务请求路径限流配置 -->

<property name="limitMaps">

<map>

<entry key="/bus/hi" value="5" />

<entry key="/bus/hi2" value="5" />

<entry key="/bus/hi3" value="6" />

</map>

</property>

</bean>

</beans>

基于web端的xml配置对应的全局限流配置和制定限流url配置,在容器加载起来后,通过拦截器进行链式过滤。这样有效起到保护后端应用不被冲垮的危险。

>Redis 计数器

对于集群级别的限流, 可以利用 Redis 来存储计算器, 比如我们想对某一个API 进行限流, 阈值为 100 CPS。

Redis 存储一张哈希表, key 名为 counter_<api_endpoint_name>_cps:timestamp 为当前时间 System.currentTimeMillis() 除以1000

例如

set counter_check_health_cps:1558962905 1

"OK"

INCRBY counter_check_health_cps:1558962905 1

2

>Spring Cloud Zuul Route Filter

Spring Cloud 的开源网关项目 Zuul , 它基于过滤器模式提供若干过滤器的实现, 对于 Rate Limit 的也有一个开源的实现

<dependency>

    <groupId>org.springframework.cloud</groupId>

    <artifactId>spring-cloud-starter-netflix-zuul</artifactId></dependency><dependency>

    <groupId>com.marcosbarbero.cloud</groupId>

    <artifactId>spring-cloud-zuul-ratelimit</artifactId>

<version>2.2.0.RELEASE</version>

</dependency>

 

 

总结

限流虽好,但有风险,君实行需谨慎,处理“被干预掉”的流量。能不能直接丢弃?不能的话该如何处理?当确认需要限流后,那应该以什么样的方案来一步步演进呢?

不管是查询业务的限流还是更新业务都是有损的,对于产品体验,前后优雅提示都是需要综合平衡。

通过「压力测试」等方式获得系统的能力上限在哪个水平是第一步。其次,就是制定干预流量的策略。比如标准该怎么定、是否只注重结果还是也要注重过程的平滑性。


以上是关于微服务-限流架构设计的主要内容,如果未能解决你的问题,请参考以下文章

微服务架构中的熔断器设计与实现( Golang 版)

分布式系统架构设计三十六式之服务治理 – 横向限流模式

浅谈微服务架构——容错模式2

微服务架构分布式限流策略

最全架构设计实践方法论: 微服务

架构设计之「服务限流」