RateLimiter令牌桶算法浅析

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RateLimiter令牌桶算法浅析相关的知识,希望对你有一定的参考价值。

参考技术A

百度百科中的定义:
令牌桶算法是网络流量(Traffic Shaping)整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。

大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。

传送到令牌桶的数据包需要消耗令牌。不同大小的数据包,消耗的令牌数量不一样。令牌桶这种控制机制基于令牌桶中是否存在令牌来指示什么时候可以发送流量。令牌桶中的每一个令牌都代表一个字节。如果令牌桶中存在令牌,则允许发送流量;而如果令牌桶中不存在令牌,则不允许发送流量。因此,如果突发门限被合理地配置并且令牌桶中有足够的令牌,那么流量就可以以峰值速率发送。

令牌桶算法的基本过程:
假如用户配置的平均发送速率为r,则每隔1/r秒一个令牌被加入到桶中;桶最多可以存发b个令牌。如果令牌到达时令牌桶已满,则这个令牌会被丢弃;
当一个n个字节的数据包到达时,就从令牌桶中删除n个令牌,并且数据包被发送到网络;
如果令牌桶中少于n个令牌,则不会删除令牌,并且认为这个数据包在流量限制之外;
算法允许最长b个字节的突发,数据包的速率被限制成常量r。对于在流量限制外的数据包可以以不同的方式处理:
1)被丢弃;
2)放在队列中当令牌桶中累积了足够多的令牌时再传输;
3)继续发送,但需要做特殊标记,网络过载时将这些特殊标记的包丢弃。

令牌桶算法与漏桶算法(Leaky Bucket)的主要区别:
1)漏桶算法能够强行限制数据的传输速率,而令牌桶算法在能够限制数据的平均传输速率外,还允许某种程度的突发传输。
2)令牌桶算法中,只要令牌桶中存在令牌,就允许突发地传输数据直到达到用户配置的上限,它适合于具有突发特性的流量。

RateLimiter是Guava中开源的一个令牌桶算法工具类,可以轻松实现限流工作。
RateLimiter有两个实现类:SmoothBursty和SmoothWarmingUp;
两者区别:
1)都是令牌桶算法的变种实现
2)SmoothBursty加令牌的速度是恒定的,SmoothWarmingUp会有个预热期,在预热期内加令牌的速度是慢慢增加的,直到达到固定速度为止。其适用场景是,对于有的系统而言刚启动时能承受的QPS较小,需要预热一段时间后才能达到最佳状态。

测试示例:

示例1:创建一个令牌桶,每秒生成一个令牌,申请失败立即返回。使用CountdownLatch计数器模拟多线程并发,调用await()方法阻塞当前线程,当计数完成后,唤醒所有线程并发执行。

示例2:创建一个令牌桶,每秒生成0.1个令牌,即每10s才会有一个令牌,超时时间设置成20s,20s内获取不到令牌返回失败,20s内可以生成2个令牌,加上创建时桶里会有一个令牌,超时前最终会有3条线程拿到令牌,并且每个令牌获取时间相隔10s。使用CountdownLatch计数器模拟多线程并发:调用await()方法阻塞当前线程,当计数完成后,唤醒所有线程并发执行。

参考文档:
https://baike.baidu.com/item/%E4%BB%A4%E7%89%8C%E6%A1%B6%E7%AE%97%E6%B3%95/6597000?fr=aladdin
https://blog.csdn.net/unclecoco/article/details/99583154

coding++:Semaphore—RateLimiter-漏桶算法-令牌桶算法

java中对于生产者消费者模型,或者小米手机营销 1分钟卖多少台手机等都存在限流的思想在里面。

关于限流 目前存在两大类,从线程个数(jdk1.5 Semaphore)和RateLimiter速率(guava)

Semaphore:从线程个数限流

RateLimiter:从速率限流  目前常见的算法是漏桶算法和令牌算法

令牌桶算法。相比漏桶算法而言区别在于,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理,类似于匀速往桶里放令牌

漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照定义的速度去消费数据

应用场景:

漏桶算法:必须读写分流的情况下,限制读取的速度

令牌桶算法:必须读写分离的情况下,限制写的速率或者小米手机饥饿营销的场景  只卖1分种抢购1000实现的方法都是一样。

RateLimiter来实现对于多线程问题查找时,很多时候可能使用的类都是原子性的,但是由于代码逻辑的问题,也可能发生线程安全问题

1、关于RateLimter和Semphore简单用法

package concurrent;
 
import com.google.common.util.concurrent.RateLimiter;
 
import java.util.concurrent.*;
import java.util.stream.IntStream;
 
import static java.lang.Thread.currentThread;
 
/**
 * ${DESCRIPTION}
 * 关于限流 目前存在两大类,从线程个数(jdk1.5 Semaphore)和RateLimiter速率(guava)
 * Semaphore:从线程个数限流
 * RateLimiter:从速率限流  目前常见的算法是漏桶算法和令牌算法,下面会具体介绍
 *
 * @author mengxp
 * @version 1.0
 * @create 2018-01-15 22:44
 **/
public class RateLimiterExample {
 
   //Guava  0.5的意思是 1秒中0.5次的操作,2秒1次的操作  从速度来限流,从每秒中能够执行的次数来
    private final static RateLimiter limiter=RateLimiter.create(0.5d);
 
 
    //同时只能有三个线程工作 Java1.5  从同时处理的线程个数来限流
    private final static Semaphore sem=new Semaphore(3);
    private static void testSemaphore(){
        try {
            sem.acquire();
            System.out.println(currentThread().getName()+" is doing work...");
            TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            sem.release();
            System.out.println(currentThread().getName()+" release the semephore..other thread can get and do job");
        }
    }
 
    public static void runTestSemaphore(){
        ExecutorService service = Executors.newFixedThreadPool(10);
        IntStream.range(0,10).forEach((i)->{
            //RateLimiterExample::testLimiter 这种写法是创建一个线程
            service.submit(RateLimiterExample::testSemaphore);
        });
    }
 
    /**
     * Guava的RateLimiter
     */
    private static void testLimiter(){
        System.out.println(currentThread().getName()+" waiting  " +limiter.acquire());
    }
 
    //Guava的RateLimiter
    public static void runTestLimiter(){
        ExecutorService service = Executors.newFixedThreadPool(10);
        IntStream.range(0,10).forEach((i)->{
            //RateLimiterExample::testLimiter 这种写法是创建一个线程
            service.submit(RateLimiterExample::testLimiter);
        });
    }
 
 
 
    public static void main(String[] args) {
        IntStream.range(0,10).forEach((a)-> System.out.println(a));//从0-9
        //runTestLimiter();
        runTestSemaphore();
    }
}

2、实现漏桶算法

package concurrent.BucketAl;
 
import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.RateLimiter;
 
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
 
import static java.lang.Thread.currentThread;
 
/**
 * ${DESCRIPTION}
 *
 * @author mengxp
 * @version 1.0
 * @create 2018-01-20 22:42
 * 实现漏桶算法 实现多线程生产者消费者模型 限流
 **/
public class Bucket {
    //定义桶的大小
    private final ConcurrentLinkedQueue<Integer> container=new ConcurrentLinkedQueue<>();
 
    private final static int  BUCKET_LIMIT=1000;
 
    //消费者 不论多少个线程,每秒最大的处理能力是1秒中执行10次
    private final RateLimiter consumerRate=RateLimiter.create(10d);
 
    //往桶里面放数据时,确认没有超过桶的最大的容量
    private Monitor offerMonitor=new Monitor();
 
    //从桶里消费数据时,桶里必须存在数据
    private Monitor consumerMonitor=new Monitor();
 
 
    /**
     * 往桶里面写数据
     * @param data
     */
    public void submit(Integer data){
        if (offerMonitor.enterIf(offerMonitor.newGuard(()->container.size()<BUCKET_LIMIT))){
            try {
                container.offer(data);
                System.out.println(currentThread()+" submit.."+data+" container size is :["+container.size()+"]");
            } finally {
                offerMonitor.leave();
            }
        }else {
            //这里时候采用降级策略了。消费速度跟不上产生速度时,而且桶满了,抛出异常
            //或者存入MQ DB等后续处理
            throw new IllegalStateException(currentThread().getName()+"The bucket is ful..Pls latter can try...");
        }
    }
 
 
    /**
     * 从桶里面消费数据
     * @param consumer
     */
    public void takeThenConsumer(Consumer<Integer> consumer){
        if (consumerMonitor.enterIf(consumerMonitor.newGuard(()->!container.isEmpty()))){
            try {
                //不打印时 写 consumerRate.acquire();
                System.out.println(currentThread()+"  waiting"+consumerRate.acquire());
                Integer data = container.poll();
                //container.peek() 只是去取出来不会删掉
                consumer.accept(data);
            }finally {
                consumerMonitor.leave();
            }
        }else {
            //当木桶的消费完后,可以消费那些降级存入MQ或者DB里面的数据
            System.out.println("will consumer Data from MQ...");
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
 
}

2.1 漏桶算法测试类

package concurrent.BucketAl;
 
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
 
import static java.lang.Thread.currentThread;
 
/**
 * ${DESCRIPTION}
 *
 * @author mengxp
 * @version 1.0
 * @create 2018-01-20 23:11
 * 漏桶算法测试
 * 实现漏桶算法 实现多线程生产者消费者模型 限流
 **/
public class BuckerTest {
 
    public static void main(String[] args) {
        final Bucket bucket = new Bucket();
        final AtomicInteger DATA_CREATOR = new AtomicInteger(0);
 
        //生产线程 10个线程 每秒提交 50个数据  1/0.2s*10=50个
        IntStream.range(0, 10).forEach(i -> {
            new Thread(() -> {
                for (; ; ) {
                    int data = DATA_CREATOR.incrementAndGet();
                    try {
                        bucket.submit(data);
                        TimeUnit.MILLISECONDS.sleep(200);
                    } catch (Exception e) {
                        //对submit时,如果桶满了可能会抛出异常
                        if (e instanceof IllegalStateException) {
                            System.out.println(e.getMessage());
                            //当满了后,生产线程就休眠1分钟
                            try {
                                TimeUnit.SECONDS.sleep(60);
                            } catch (InterruptedException e1) {
                                e1.printStackTrace();
                            }
                        }
                    }
                }
            }).start();
        });
 
 
        //消费线程  采用RateLimiter每秒处理10个  综合的比率是5:1
        IntStream.range(0, 10).forEach(i -> {
            new Thread(
                    () -> {
                        for (; ; ) {
                            bucket.takeThenConsumer(x -> {
                                System.out.println(currentThread()+"C.." + x);
                            });
                        }
                    }
            ).start();
        });
 
    }
}

3、令牌桶算法

package concurrent.TokenBucket;
 
import com.google.common.util.concurrent.RateLimiter;
 
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
 
import static java.lang.Thread.currentThread;
import static java.lang.Thread.interrupted;
 
/**
 * ${DESCRIPTION}
 *
 * @author mengxp
 * @version 1.0
 * @create 2018-01-21 0:18
 * 令牌桶算法。相比漏桶算法而言区别在于,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理,类似于匀速往桶里放令牌
 * 漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照定义的速度去消费数据
 *
 * 应用场景:
 * 漏桶算法:必须读写分流的情况下,限制读取的速度
 * 令牌桶算法:必须读写分离的情况下,限制写的速率或者小米手机饥饿营销的场景  只卖1分种抢购1000
 *
 * 实现的方法都是一样。RateLimiter来实现
 * 对于多线程问题查找时,很多时候可能使用的类都是原子性的,但是由于代码逻辑的问题,也可能发生线程安全问题
 **/
public class TokenBuck {
 
    //可以使用 AtomicInteger+容量  可以不用Queue实现
   private AtomicInteger phoneNumbers=new AtomicInteger(0);
   private RateLimiter rateLimiter=RateLimiter.create(20d);//一秒只能执行五次
   //默认销售500台
   private final static int DEFALUT_LIMIT=500;
   private final int saleLimit;
 
    public TokenBuck(int saleLimit) {
        this.saleLimit = saleLimit;
    }
 
    public TokenBuck() {
        this(DEFALUT_LIMIT);
    }
 
    public int buy(){
        //这个check 必须放在success里面做判断,不然会产生线程安全问题(业务引起)
        //原因当phoneNumbers=99 时 同时存在三个线程进来。虽然phoneNumbers原子性,但是也会发生。如果必须写在这里,在success
        //里面也需要加上double check
       /* if (phoneNumbers.get()>=saleLimit){
            throw new IllegalStateException("Phone has been sale "+saleLimit+" can not  buy more...")
        }*/
 
        //目前设置超时时间,10秒内没有抢到就抛出异常
        //这里的TimeOut*Ratelimiter=总数  这里的超时就是让别人抢几秒,所以设置总数也可以由这里的超时和RateLimiter来计算
         boolean success = rateLimiter.tryAcquire(10, TimeUnit.SECONDS);
         if (success){
             if (phoneNumbers.get()>=saleLimit){
                 throw new IllegalStateException("Phone has been sale "+saleLimit+" can not  buy more...");
             }
             int phoneNo = phoneNumbers.getAndIncrement();
             System.out.println(currentThread()+" user has get :["+phoneNo+"]");
             return phoneNo;
         }else {
             //超时后 同一时间,很大的流量来强时,超时快速失败。
             throw new RuntimeException(currentThread()+"has timeOut can try again...");
         }
 
    }
}

3.1、令牌桶算法的测试类

package concurrent.TokenBucket;
 
import java.util.stream.IntStream;
 
/**
 * ${DESCRIPTION}
 *
 * @author mengxp
 * @version 1.0
 * @create 2018-01-21 0:40
 **/
public class TokenBuckTest {
    public static void main(String[] args) {
        final TokenBuck tokenBuck=new TokenBuck(200);
 
 
        IntStream.range(0,300).forEach(i->{
            //目前测试时,让一个线程抢一次,不用循环抢
            //tokenBuck::buy 这种方式 产生一个Runnable
            new Thread(tokenBuck::buy).start();
        });
    }
}

 

 

 

 

以上是关于RateLimiter令牌桶算法浅析的主要内容,如果未能解决你的问题,请参考以下文章

RateLimiter 的底层实现是啥?

高并发学习之使用RateLimiter实现令牌桶限流

限流算法之漏桶算法令牌桶算法

RateLimiter

coding++:高并发解决方案限流技术-使用RateLimiter实现令牌桶限流-Demo

RateLimiter 的底层实现是啥?