如何用 Java 来构建一个简单的速率限制器?

Posted Javatutouhouduan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何用 Java 来构建一个简单的速率限制器?相关的知识,希望对你有一定的参考价值。

速率限制

现实世界中的用户是残暴的,并且没耐心,充满着各种不确定性。在高并发系统中,可能会出现服务器被虚假请求轰炸的情况,因此您可能希望控制这种情况。

一些实际使用情形可能如下所示:

  1. API配额管理-作为提供者,您可能希望根据用户的付款情况限制向服务器发出API请求的速率。这可以在客户端或服务端实现。
  2. 安全性-防止DDOS攻击。
  3. 成本控制--这对服务方甚至客户方来说都不是必需的。如果某个组件以非常高的速率发出一个事件,它可能有助于控制它,它可能有助于控制从客户端发送的遥测。

限速处理时的选项

根据我们处理的请求/事件类型,可能会发生以下情况:

  1. 我们可以放弃额外的请求
  2. 我们可以选择让请求等待,直到系统将它们降低到预定义的速率。

常用限速算法

  1. 令牌桶算法
  2. 漏桶算法

我们将不深入讨论这些算法的内部细节,因为这超出了本文的范围。

我们将以令牌桶算法为中心。其要求如下。

令牌桶算法基于以固定速率添加令牌的固定容量桶的类比。在允许API继续之前,将检查桶,以查看它当时是否包含至少一个令牌。如果令牌存在,则进行API调用。如果不是,则丢弃该消息/或使其等待。

需求

  1. 应该能够接受每秒所需的(TPS)事务或速率。
  2. 如果超过我们定义的比率,则应放弃交易。
  3. 应该在同时发生的情况下起作用。

高级功能(在后续文章中实现)

  1. 应该能够平滑突发的请求。例如,如果我们将TPS定义为5,并且所有五个请求都在同一时刻到达,那么它应该能够以固定的时间间隔将它们排成一行,即以200ms的时间间隔执行每个请求。它需要一个内部定时电路。
  2. 如果我们的TPS为5,并且在其中一个1秒的时段中,我们在下一秒只使用3个代币,那么我们应该能够提供5+2 = 7个代币作为奖励。但速率为每个令牌1/7(142.28ms)。奖金不应结转到下一个插槽。

让我们首先定义我们的 速率限制器

/**
 * Rate limiter helps in limiting the rate of execution of a piece of code. The rate is defined in terms of
 * TPS(Transactions per second). Rate of 5 would suggest, 5 transactions/second. Transaction could be a DB call, API call,
 * or a simple function call.
 * <p>
 * Every @link RateLimiter implementation should implement either @link RateLimiter#throttle(Code) or, @link RateLimiter#enter().
 * They can also choose to implement all.
 * <p>
 * @link Code represents a piece of code that needs to be rate limited. It could be a function call, if the code to be rate limited
 * spreads across multiple functions, we need to use entry() and exit() contract.
 */
public interface RateLimiter 

/**
     * Rate limits the code passed inside as an argument.
     *
     * @param code representation of the piece of code that needs to be rate limited.
     * @return true if executed, false otherwise.
     */
    boolean throttle(Code code);
    /**
     * When the piece of code that needs to be rate limited cannot be represented as a contiguous
     * code, then entry() should be used before we start executing the code. This brings the code inside the rate
     * limiting boundaries.
     *
     * @return true if the code will execute and false if rate limited.
     * <p
     */
    boolean enter();
    /**
     * Interface to represent a contiguous piece of code that needs to be rate limited.
     */
    interface Code 
        /**
         * Calling this function should execute the code that is delegated to this interface.
         */
        void invoke();
    

复制代码

我们的 RateLimit有两组API:一个是throttle(code),另一个是enter()。这两种方法都满足相同的功能,但采用以下两种方式:

  1. boolean throttle(代码)-如果我们有连续的代码,可以用来传递一个代码块。
  2. 布尔输入() - 通常可以在API、DB或任何我们想要节流的调用之前使用。如果执行此代码后面的代码,则将返回 ,以及 假的如果它是速率受限的话。您可以将这些请求排队或拒绝。

在生产环境中您永远不会看到节流(代码)实现,因为它不是最佳的。请在评论中告诉我原因。大多数速率限制器使用类似于enter()的API

核心功能

为了构建速率限制器的核心,我们需要确保在任意两秒之间不允许超过N个事务。我们将如何做到这一点?

考虑我们进行第一笔交易的时刻t0。 t0 .所以,
直到(t0 + 1)s,我们只允许进行N次交易。 (t0 + 1)s , we are allowed to make only N transactions.如何确保这一点?在下次交易时,我们将检查
当前时间≤(t0 + 1)。.如果没有,那么这意味着我们进入了不同的秒,并且我们被允许进行N次交易。 N transactions.让我们看一小段代码,它演示了:

long now = System.nanoTime();
if (now <= mNextSecondBoundary)  // If we are within the time limit of current second
    if (mCounter < N)  // If token available
        mLastExecutionNanos = now;
        mCounter++; // Allocate token
        invoke(code); // Invoke the code passed the throttle method.
    

复制代码

那么,我们如何定义mNextSecondBoundary呢?这将在我们进行第一个事务时完成,如前所述,我们将在完成第一个事务的时刻增加一秒。

if (mLastExecutionNanos == 0L) 
    mCounter++; // Allocate the very first token here.
    mLastExecutionNanos = System.nanoTime();
    mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC;  // (10^9)

复制代码

现在,如果我们执行代码并看到我们进入了不同的秒,我们应该怎么做?我们将通过重置上次执行时间、可用令牌数来增强前面的代码,并通过调用 节流阀()再一次。我们的方法已经知道如何处理新的秒。

@Override
public boolean throttle(Code code) 
    if (mTPS <= 0) 
        // We do not want anything to pass.
        return false;
    

synchronized (mLock) 
        if (mLastExecutionNanos == 0L) 
            mCounter++;
            mLastExecutionNanos = System.nanoTime();
            mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC;
            invoke(code);
            return true;
         else 
            long now = System.nanoTime();
            if (now <= mNextSecondBoundary) 
                if (mCounter < mTPS) 
                    mLastExecutionNanos = now;
                    mCounter++;
                    invoke(code);
                    return true;
                 else 
                    return false;
                
             else 
                // Reset the counter as we in a different second now.
                mCounter = 0;
                mLastExecutionNanos = 0L;
                mNextSecondBoundary = 0L;
                return throttle(code);
            
        
    

复制代码

在这个实现中,我们可以传递需要节流的代码块,但是这个代码有一个问题。这将工作,但它会表现不佳。不推荐,但为什么呢?请在评论中告诉我。

现在,可以使用相同的构建块和enter()构建第二个API。我们将使用相同的逻辑,但我们不会执行方法内部的代码块。相反,它将在调用enter()之后执行,就像我们执行状态管理一样。该方法的实现如下:

@Override
public boolean enter() 
    if (mTPS == 0L) 
        return false;
    

synchronized (mBoundaryLock) 
        if (mLastExecutionNanos == 0L) 
            mLastExecutionNanos = System.nanoTime();
            mCounter++;
            mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC;
            return true;
         else 
            long now = System.nanoTime();
            if (now <= mNextSecondBoundary) 
                if (mCounter < mTPS) 
                    mLastExecutionNanos = now;
                    mCounter++;
                    return true;
                 else return false;
             else 
                // Reset the counter as we in a different second now.
                mCounter = 0;
                mLastExecutionNanos = 0L;
                mNextSecondBoundary = 0L;
                return enter();
            
        
    

复制代码

现在,我们简单的速率限制器已经可以使用了。您可以查看完整的代码 这里

结果

我们将尝试创建一个可创建六个线程的驱动程序代码。每个线程尝试从0到100计数,延迟为50ms(可以设置为任何数字)。我们将按如下方式启动我们的限速器:

public static void main(String[] args) 
    RateLimiter limiter = new SimpleTokenBucketRateLimiter(1);
    Thread[] group = new Thread[6];
    Runnable r = () -> 
        for (int i = 0; i < 100; i++) 
            try 
                Thread.sleep(50);
             catch (InterruptedException e) 
                throw new RuntimeException(e);
            
            if (limiter.enter()) 
                System.out.println("Values:- " + Thread.currentThread().getName() + ": " + i);
            
        
    ;


for (int i = 0; i < 6; i++) 
        group[i] = new Thread(r);
        group[i].start();
    

复制代码

我们的API不支持平滑事务,而是让事务等待下一个令牌被分配,而不是丢弃请求。在拒绝它们之后,它返回false,所以如果我们真的想的话,我们可以把它们排队。

if (limiter.enter()) 
                System.out.println("Values:- " + Thread.currentThread().getName() + ": " + i);
 else  // queue the work again 
复制代码

这是TPS设置为1时的输出。

当我们尝试将TPS设置为 2我们将看到以下输出:

真管用!

android的角度看

  1. 考虑这样一种情况:您正在编写代码以捕获用户签名。当他们拖动指针时,您会捕获数千个点。平滑签名可能不需要所有这些参数,因此您使用速率限制进行采样。
  2. 一些事件调用频率很高。你能控制的。
  3. 我们有MessageQueue的空闲侦听器。当我们在主线程中侦听它时,它被随意调用。有时候,它在一秒钟内被调用好几次。如果我们想构建一个心跳系统来告诉我们主线程何时空闲,我们可以使用它来接收每秒的事件。如果我们一秒钟内没有收到事件,我们可以假定主线程处于忙碌状态。
  4. 对于您的框架/库的API配额管理,您可以根据用户选择的付款计划情况API调用。

今天先到这里吧。 我们将在后续文章中构建一个更复杂的速率限制器。

java 359.记录器速率限制器(#)。java

public class Logger {
    
    private Map<String, Integer> map;
    /** Initialize your data structure here. */
    public Logger() {
        map = new HashMap<>();
    }
    
    /** Returns true if the message should be printed in the given timestamp, otherwise returns false.
        If this method returns false, the message will not be printed.
        The timestamp is in seconds granularity. */
    public boolean shouldPrintMessage(int timestamp, String message) {
        if(timestamp < map.getOrDefault(message, 0)) {
            return false;
        }
        map.put(message, timestamp + 10);
        return true;
    }
}

/**
 * Your Logger object will be instantiated and called as such:
 * Logger obj = new Logger();
 * boolean param_1 = obj.shouldPrintMessage(timestamp,message);
 */

以上是关于如何用 Java 来构建一个简单的速率限制器?的主要内容,如果未能解决你的问题,请参考以下文章

java 359.记录器速率限制器(#)。java

java 359.记录器速率限制器(#)。java

java 359.记录器速率限制器(#)。java

java 359.记录器速率限制器(#)。java

java 359.记录器速率限制器(#)。java

在 Laravel 中禁用速率限制器?