干货分享深入理解高可用之限流

Posted 在路上的德尔菲

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了干货分享深入理解高可用之限流相关的知识,希望对你有一定的参考价值。

关于故障

故障产生的原因可以分为以下几大类:

  1. 网络问题:网络链接出现问题、宽带出现堵塞。
  2. 性能问题:数据库慢SQL、Java full gc、硬盘IO过大、CPU飙高、内存不足。
  3. 安全问题:被网络攻击,如DDoS。
  4. 运维问题:系统不断进行更新和修改。
  5. 管理问题:没有梳理出关键服务及服务依赖关系。
  6. 硬件问题:硬盘损坏、网卡故障、机房掉电。

换个角度从流量时间顺序来说,故障来自四个方面:

  1. 上游流量突增。
  2. 系统本身出现问题,比如断电、磁盘打满等。
  3. 人为变更导致,比如发布上线。
  4. 下游服务出现异常。

所以说故障是正常的,而且是常见的,“Everything will fails”

不要尝试着去避免故障,而是把处理故障的代码当成正常的功能做在架构里写在代码中。

降级

降级(Degradation)本质是为了解决资源不足和访问量过大的问题,暂时牺牲掉一些东西以保证整个系统的平稳运行。

对服务调用方来说,服务降级可以在被调用服务不可用时,通过临时的替代方案向上提供有损服务,保证业务柔性可用;

对于服务提供方来说,服务降级极大的减轻了对服务提供方的压力,有助于服务提供方快速恢复。

限流可以认为是服务降级的一种,限流就是限制系统的输入和输出流量来达到保护系统的目的,很多地方都有限流的思想,如数据库连接池,线程池,nginx下的用于限制瞬时并发连接数的limit_conn模块,限制每秒平均速率的limit_req模块。

限制接口每秒可以通过的请求数,超出的流量可以选择延迟处理、部分拒绝或者直接拒绝。

降级是一种思想,限流、熔断是措施。

降级的三种模式:

  1. 强制降级:在强制降级模式下,客户端对某一服务的调用会全部降级,降级后会根据配置的降级策略返回。强制降级的开关需要由开发负责人手动打开和关闭,一般用于在紧急情况下对后端调用做全部降级,并在恢复正常后手动关闭降级开关。
  2. 失败降级:在失败降级模式下,客户端始终会调用后端服务,但仅当调用失败时才进行服务降级,这个时候将根据配置的降级策略返回。
  3. 自动降级:在自动降级模式下,客户端会根据当前的服务调用情况,自动识别是否需要降级。如果需要降级,会根据配置的降级策略返回;如果不需要降级,就进行正常的服务调用。

限流策略

  • 拒绝服务:把多的请求拒绝掉,统计哪个客户端来的流量最多,直接拒绝掉,这种方式可以把一些不正常的或有恶意的高并发访问拦在外面。在Pigeon中,可限制某个客户端应用请求的最大QPS,如果客户端请求QPS超过阈值,服务端返回RejectedException;限制服务端某个服务方法对某个客户端应用请求最大QPS,如.EchoService服务接口的echo方法,对客户端应用account-service的最大单机QPS为200。
  • 服务降级:这样让服务有更多的资源处理更多的请求,一种降级的方式是把一些不重要的服务给停掉,把CPU、内存或是数据的资源让给更重要的功能;另一种是不再返回全量数据,只返回部分数据,以牺牲一致性的方式来获得更大的性能吞吐。
  • 特权请求:资源不够用,只能把有限的资源分给重要的用户,如VIP用户。
  • 延时处理:一般有队列缓冲大量的请求,如果队列满了,那么也只能拒绝用户了,使用缓冲队列只是为了减缓压力,一般用于应对短暂的峰刺请求。

限流的实现方式

  • 计数器方式:简单暴力,直接维护一个计数器counter,限制一秒钟能够通过的请求数,算法的实现思路就是从第一个请求开始计时,在接下来1秒内,当一个请求进来,counter计数加1,若counter大于100,开启拒绝请求以保护系统,后续的请求就会全部拒绝。可以通过AtomicLong#incrementAndGet()方法计数器加1并返回最新值,通过这个值和阈值进行比较。缺点是产生“突刺现象”。
  • 队列算法:优先级队列,先处理高优先级的队列,再处理低优先级的队列。为了避免低优先队列被饿死,一般是分配不同比例的处理时间到不同的队列上,即带权重的队列,如处理权重为3的队列上3个请求后,再去权重为2的队列上处理2个请求,最后再去权重为1的队列上处理1个请求。如果处理过慢,就会导致队列满而开始触发限流。
  • 漏斗算法:一个固定的容量漏斗,按照固定速率流出水滴,如果流入水滴超出漏斗的容量,则流入的水滴溢出(被丢弃)。

  • 令牌桶算法:对于很多应用场景来说,除了要求能够限制数据的平均传输速率,还要求允许某种程度的突发流量,这时候漏斗算法可能就不合适了,可采用令牌桶算法。在一个桶内按照一定的速率放入一些令牌(token),只有拿到令牌才能处理请求;令牌桶有一个容量,当令牌桶满了的时候,再向其中放入令牌则会丢弃。在流量小的时候攒钱,流量大的时候可以快速处理。

漏斗算法和令牌桶算法使用场景不同,并不能说令牌桶一定比漏斗算法好。

令牌桶可以用来保护自己,主要用来对调用者频率进行限流,为的是让自己不被打垮。所以如果自己本身有处理能力的时候,如果流量突发(实际消费能力强于配置的流量限制),那么实际处理速率可以超过配置的限制。

而漏桶算法,这是用来保护他人,也就是保护他所调用的系统。主要场景是,当调用的第三方系统本身没有保护机制,或者有流量限制的时候,我们的调用速度不能超过他的限制,由于我们不能更改第三方系统,所以只有在主调方控制。这个时候,即使流量突发,也必须舍弃。因为消费能力是第三方决定的。

限流策略

  1. 单机限流:单机固定限流,基于令牌桶算法实现,可以应对突发流量,默认桶中会保存设置qps数量的令牌。
  2. 集群限流(非精确):单机限流,通过指定集群的QPS,单机QPS会随着业务扩容缩容,根据业务的机器数动态的计算。
  3. 集群限流(精确):基于Redis的计数实现精确集群限流,每次限流需要和Redis进行通信,有1ms左右的性能损耗,如果Redis挂了,将不进行限流。
  4. 集群限频:内部通过uuid关键字,实现各个uuid的访问频次,比如有用户a,b,c,d… z,访问接口/index,目前的需求是限制每个用户的访问频次 2次/ 5s,可以通过集群限频并基于redis的计数实现,用户id+接口名作为key,过期时间为5s,保存在redis中
  5. 集群配额:和集群限频的实现原理一致,但是每种策略的限流只能创建一个,如果遇到下面的需求,可以使用该策略,例如:需要限制每个用户的访问频次 2次/ 5s,同时还要限制每天的调用总数为2000,对于这二个需求,就可以创建一个集群配额,限制每个用户每天的调用总次数

最佳实践

TODO配置

源码分析

        RateLimiter rateLimiter = RateLimiter.create(2);
        
        boolean res = rateLimiter.tryAcquire();
        
        rateLimiter.getRate();
        
        rateLimiter.setRate(100);
        
        rateLimiter.tryAcquire();


@ThreadSafe
@Beta
public abstract class RateLimiter {

  public static RateLimiter create(double permitsPerSecond) {

    return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);
  }


  static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }


  public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
    checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
    return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond, warmupPeriod, unit);
  }

  @VisibleForTesting
  static RateLimiter create(
      SleepingStopwatch stopwatch, double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
    RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }

  /**
   * The underlying timer; used both to measure elapsed time and sleep as necessary. A separate
   * object to facilitate testing.
   */
  private final SleepingStopwatch stopwatch;

  // Can't be initialized in the constructor because mocks don't call the constructor.
  private volatile Object mutexDoNotUseDirectly;

  private Object mutex() {
    Object mutex = mutexDoNotUseDirectly;
    if (mutex == null) {
      synchronized (this) {
        mutex = mutexDoNotUseDirectly;
        if (mutex == null) {
          mutexDoNotUseDirectly = mutex = new Object();
        }
      }
    }
    return mutex;
  }

  RateLimiter(SleepingStopwatch stopwatch) {
    this.stopwatch = checkNotNull(stopwatch);
  }

 
  public final void setRate(double permitsPerSecond) {
    checkArgument(
        permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
    synchronized (mutex()) {
      doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
  }

  abstract void doSetRate(double permitsPerSecond, long nowMicros);

  public final double getRate() {
    synchronized (mutex()) {
      return doGetRate();
    }
  }

  abstract double doGetRate();


  public double acquire() {
    return acquire(1);
  }

  public double acquire(int permits) {
    long microsToWait = reserve(permits);
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }


  final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
  }

  /**
   * Acquires a permit from this {@code RateLimiter} if it can be obtained
   * without exceeding the specified {@code timeout}, or returns {@code false}
   * immediately (without waiting) if the permit would not have been granted
   * before the timeout expired.
   *
   * <p>This method is equivalent to {@code tryAcquire(1, timeout, unit)}.
   *
   * @param timeout the maximum time to wait for the permit. Negative values are treated as zero.
   * @param unit the time unit of the timeout argument
   * @return {@code true} if the permit was acquired, {@code false} otherwise
   * @throws IllegalArgumentException if the requested number of permits is negative or zero
   */
  public boolean tryAcquire(long timeout, TimeUnit unit) {
    return tryAcquire(1, timeout, unit);
  }

 
  public boolean tryAcquire(int permits) {
    return tryAcquire(permits, 0, MICROSECONDS);
  }

  public boolean tryAcquire() {
    return tryAcquire(1, 0, MICROSECONDS);
  }


  public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
    long timeoutMicros = max(unit.toMicros(timeout), 0);
    checkPermits(permits);
    long microsToWait;
    synchronized (mutex()) {
      long nowMicros = stopwatch.readMicros();
      if (!canAcquire(nowMicros, timeoutMicros)) {
        return false;
      } else {
        microsToWait = reserveAndGetWaitLength(permits, nowMicros);
      }
    }
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return true;
  }

  private boolean canAcquire(long nowMicros, long timeoutMicros) {
    return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
  }

  /**
   * Reserves next ticket and returns the wait time that the caller must wait for.
   *
   * @return the required wait time, never negative
   */
  final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
  }

  /**
   * Returns the earliest time that permits are available (with one caveat).
   *
   * @return the time that permits are available, or, if permits are available immediately, an
   *     arbitrary past or present time
   */
  abstract long queryEarliestAvailable(long nowMicros);


  abstract long reserveEarliestAvailable(int permits, long nowMicros);

  @Override
  public String toString() {
    return String.format("RateLimiter[stableRate=%3.1fqps]", getRate());
  }

  @VisibleForTesting
  abstract static class SleepingStopwatch {

    abstract long readMicros();

    abstract void sleepMicrosUninterruptibly(long micros);

    static final SleepingStopwatch createFromSystemTimer() {
      return new SleepingStopwatch() {
        final Stopwatch stopwatch = Stopwatch.createStarted();

        @Override
        long readMicros() {
          return stopwatch.elapsed(MICROSECONDS);
        }

        @Override
        void sleepMicrosUninterruptibly(long micros) {
          if (micros > 0) {
            Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS);
          }
        }
      };
    }
  }

  private static int checkPermits(int permits) {
    checkArgument(permits > 0, "Requested permits (%s) must be positive", permits);
    return permits;
  }


  @Override
  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    resync(nowMicros);
    long returnValue = nextFreeTicketMicros;
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    double freshPermits = requiredPermits - storedPermitsToSpend;

    long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
        + (long) (freshPermits * stableIntervalMicros);

    this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
  }

以上是关于干货分享深入理解高可用之限流的主要内容,如果未能解决你的问题,请参考以下文章

高可用之限流

Spring Cloud Gateway 之限流操作

Spring Cloud Gateway 之限流操作

高并发之限流实现

SOA架构之限流

高并发系统之限流特技