Sentinel的基本使用-流量控制及其源码调用分析

Posted _微风轻起

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Sentinel的基本使用-流量控制及其源码调用分析相关的知识,希望对你有一定的参考价值。

首先Sentinel的一些基本介绍就不多说了,其是阿里开源的用于流量控制熔断降级的这些高并发下的一些控制。下面我们直接看使用的案例。

​ 首先在项目中我们引入对应的依赖

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-core</artifactId>
    <version>1.8.3</version>
</dependency>

一、流量控制

1、QPS方式

public class SimpleEntryMain 

    private static String resourceName = "simpleEntry";

    public static void main(String[] args) throws InterruptedException 
        initFlowRule();
        for (int i = 0; i < 10; i++) 
            Thread t = new Thread(new RunTask());
            t.setName("simulate-traffic-Task");
            t.start();
            Thread.sleep(50);
        
    

    private static void initFlowRule() 
        List<FlowRule> rules = new ArrayList<FlowRule>();
        FlowRule rule1 = new FlowRule();
        rule1.setResource(resourceName);
        rule1.setCount(2);
        rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
        rule1.setLimitApp("default");
        rules.add(rule1);
        FlowRuleManager.loadRules(rules);
    

    static class RunTask implements Runnable 
        @Override
        public void run() 
                Entry entry = null;
                try 
                    entry = SphU.entry(resourceName);
                    doSomeThing();
                 catch (BlockException e1) 
                    System.out.println("Block   ----------");
                 finally 
                    if (entry != null) 
                        entry.exit();
                    
                
        
    

    public static void doSomeThing()
        System.out.println("++++++++++++++++++");
        try 
            Thread.sleep(3000);
         catch (InterruptedException e) 
            // ignore
        
    

​ 这个demo首先是通过initFlowRule()方法设置了规则,然后我们通过多线程来模拟多个调用规则,方法的调用我们简单的模拟花费Thread.sleep(3000)也就是3秒,避免线程运行的太快、很快就退出了,可能就看不到我们阻塞效果。然后线程中运行的就是Sentinel的使用:

Entry entry = null;
try 
    entry = SphU.entry(resourceName);
    doSomeThing();
 catch (BlockException e1) 
    System.out.println("Block   ----------");
 finally 
    if (entry != null) 
        entry.exit();
    

​ 这里就是try-catch-finally的标准使用流程,其主要是通过SphU.entry(resourceName)的调用在里面解析规则的配置校验,如果能通过的话就不会抛出异常,如果不能通过就抛出BlockException异常。我们上面是直接使用,当我们在SpringBoot中通过直接使用的话,其本身的封装就可以使用动态代理的方式来讲这里的doSomeThingSphU.entry(resourceName)的获取做解耦,我们加了注解后就可以直接专注于我们的doSomeThing()的方法逻辑。

2、demo介绍

​ 上面就是基于QPS方式来限流,首先是我们设置对应的规则:

private static void initFlowRule() 
    List<FlowRule> rules = new ArrayList<FlowRule>();
    FlowRule rule1 = new FlowRule();
    rule1.setResource(resourceName);
    rule1.setCount(2);
    rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
    rule1.setLimitApp("default");
    rules.add(rule1);
    FlowRuleManager.loadRules(rules);

​ 这里我们主要是设置了4个参数:

1、resource

​ 这个是表示当前规则控制的是哪个资源。这个与SphU.entry(resourceName)是对应的,当你在通过其看能不能获取规则的时候,其就是通过resourceName来匹配你前面设置的规则的。

2、count

​ 这个是规则的数量现在,例如你如果是基于QPS的方法的话,这个count就表示一秒内只能允许两个请求通过。

3、grade

​ 这个是表示你选择的哪种流控规则。例如QPS、或同时运行的线程数。

/**
 * The threshold type of flow control (0: thread count, 1: QPS).
 */
private int grade = RuleConstant.FLOW_GRADE_QPS;
public final class RuleConstant 

    public static final int FLOW_GRADE_THREAD = 0;
    public static final int FLOW_GRADE_QPS = 1;

4、limitApp

​ 这个是表示规则现在的哪个应用的资源,应该是用于规则只作用在对应的app上面,不过你本身练习或单应用,并不需要主动来设置,因为其在new FlowRule()的时候就自动设置了:

public FlowRule() 
    super();
    setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);

public static final String LIMIT_APP_DEFAULT = "default";

3、线程启动之间Thread.sleep(50)的原因

​ 上面的注意点主要有我们故意在开启10个线程的时候,在它们之间加了Thread.sleep(50),如果不加这个可能就没有柱塞的效果,例如如果没有加这个:

++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++

​ 可以看到没有阻塞,如果我们加了:

++++++++++++++++++
++++++++++++++++++
Block   ----------
Block   ----------
Block   ----------
Block   ----------
Block   ----------
Block   ----------
Block   ----------
Block   ----------

​ 当然如果有加Thread.sleep(50),打印也可能是其他的顺序,这个看CPU的具体运行。

​ 我们之所以要在线程启动之间加这个运行间隔,是因为Sentinel本身的计算原理。一个是sentinel是否能通过判断与对应的统计数量+1中间并不是原子性的。Sentinel本身的运行时一个链式执行结构,因为其本身不单只有流量控制规则、还是熔断降级等其他的规则,以及一些统计运算,这些都是封装在ProcessorSlot接口中,然后通过设置next就这样一个、一个的链上了。

1、判断与统计+1之间只有可见性,而没有原则性

public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> 

    private AbstractLinkedProcessorSlot<?> next = null;

    @Override
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable 
        if (next != null) 
            next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        
    

​ 例如我们的流控节点就是FlowSlot在处理、熔断降级是DegradeSlot处理的,然后我们是否能通过的数据就是StatisticSlot在处理:

public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> 
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable 
        try 
            // Do some checking.
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

        	// Request passed, add thread count and pass count.
        	node.increaseThreadNum();
        	node.addPassRequest(count);
			..........
	
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> 
	@Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable 
        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DskkYYo5-1649149254768)(F:\\MarkDown-File\\知识点资料\\Sentinel使用.assets\\image-20220404141725878.png)]

​ 可以看到我们的判断是在FlowSlot中处理的,然后对于节点的计数是在StatisticSlot中处理的,而这组件是没有同步控制的,

输入计数是使用了LongAdder实现可见性。

public class MetricBucket 

    private final LongAdder[] counters;
private LongAdder curThreadNum = new LongAdder();

所以在并发的时候,可能就是多个线程在判断的时候,其他线程也是判断,+1操作可能还没有运行。

我们可以修改下线程启动的线程的方式:

public static void main(String[] args) throws InterruptedException 
//        initFlowQpsRule();
        initFlowRule();
        for (int i = 0; i < 5; i++) 
            Thread t = new Thread(new RunTask());
            t.setName("simulate-traffic-Task");
            t.start();
        
        Thread.sleep(100);
        for (int i = 0; i < 5; i++) 
            Thread t = new Thread(new RunTask());
            t.setName("simulate-traffic-Task");
            t.start();
        
    

​ 我们将10个线程拆分为两组,在之间睡眠100ms

++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
Block   ----------
Block   ----------
Block   ----------
Block   ----------
Block   ----------

​ 可以看到前面5个并没有是2+3方式(两个通过、3个阻塞),但后面的5个都被限流了。基于这个分段、并且Sentinel是滑动窗口的方式我们可以联想到另一个原因,就是窗口分段区间的问题。

4、默认的滑动窗口逻辑。

Sentinel本身是使用的滑动窗口进行调用计数的,当在调用entry方法获取资源的时候,其就会计数。这个计数就是通过滑动窗口处理的。其逻辑例如QPS的话,其滑动的单位就是1000ms,然后其内部又会将这个1000ms划分为更多的段,然后再进行数值判断。

例如限流规则使用QPS规则的话,默认是将1秒分为两个区间,也就是每段500ms也就是0.5秒。例如我们在获取数量判断的时候,如果是在B1区间(也就是500-1000)的时候,其就会将B0-B1两个空间的值加起来,而当我们在B3的时候,Sentinel就通过当前的时候就能判断这个时候B0已经失效了,就会将B0失效,生成新的数据端描叙B2,判断数量的会就会是B1+B2

*     B0       B1      B3
* ||_______|_______|_______||___
* 0       500     1000    1500   timestamp
*      ^
*      time=300

​ 这里在我们开始的时候也就是从第0秒开始。然后我们再修改原来的线程启动间隔,并且我们这里将线程改为2+1+2的数量(减少线程数量,是为了防止中间打印太多字符不好分析逻辑,但这里其实按比例拓展线程数量以及流量控制数量都是可以的)。

public static void main(String[] args) throws InterruptedException 
        initFlowRule();
        for (int i = 0; i < 1; i++) 
            Thread t = new Thread(new RunTask());
            t.setName("simulate-traffic-Task");
            t.start();
        
        Thread.sleep(500);
        for (int i = 0; i < 1; i++) 
            Thread t = new Thread(new RunTask());
            t.setName("simulate-traffic-Task");
            t.start();
        
        Thread.sleep(500);
        for (int i = 0; i < 2; i++) 
            Thread t = new Thread(new RunTask());
            t.setName("simulate-traffic-Task");
            t.start();
            Thread.sleep(50);
        
    
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
Block   ----------

​ 我们可以看到这个demo的逻辑,是想说明Sentinel对滑动窗口桶的划分逻辑,Sentinel默认是将1s划分为两个桶,也就是每个桶为500ms,所以我们上面的逻辑就说明了,在B0获取一个了,然后B1也能获取一个,但由于B2是在500-1500这个区间,由这两个桶计算所得,所以在1000-1500这里就只能获取一个,阻塞一个。

public class StatisticNode implements Node 

    /**
     * Holds statistics of the recent @code INTERVAL milliseconds. The @code INTERVAL is divided into time spans
     * by given @code sampleCount.
     */
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);
public static volatile int SAMPLE_COUNT = 2;
public class ArrayMetric implements Metric 

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) 
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    
public LeapArray(int sampleCount, int intervalInMs) 
    AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
    AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
    AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");

    this.windowLengthInMs = intervalInMs / sampleCount;
    this.intervalInMs = intervalInMs;
    this.intervalInSecond = intervalInMs / 1000.0;
    this.sampleCount = sampleCount;

    this.array = new AtomicReferenceArray<>(sampleCount);

例如在StatisticNode计算有多少通过的时候就是通过ArrayMetric里面的桶通过的数量相加:

@Override
public double passQps() 
    return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();

public class ArrayMetric implements Metric 

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) 
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    
		.........
    @Override
    public long pass() 
        data.currentWindow();
        long pass = 0;
        List<MetricBucket> list = data.values();

        for (MetricBucket window : list) 
            pass += window.pass();
        
        return pass;
    

​ 这里的data.currentWindow();就是计数当前是在哪个桶,如果还没有就创建,感兴趣可以自己具体去看下这个逻辑。然后data.values()就是获取当前时间用于数量的桶。

public MetricBucket() 
    MetricEvent[] events = MetricEvent.values();
    this.counters = new LongAdder[events.length];
    for (MetricEvent event : events) 
        counters[event.ordinal()] = new LongAdder();
    
    initMinRt();

public enum MetricEvent 

    /**
     * Normal pass.
     */
    PASS,
    /**
     * Normal block.
     */
    BLOCK,
    EXCEPTION,
    SUCCESS,
    RT,

    /**
     * Passed in future quota (pre-occupied, since 1.5.0).
     */
    OCCUPIED_PASS

​ 这里就记录了通过、阻拦、异常这些信息。这些内容就能用于当前请求是否能通过的判断逻辑:

public class DefaultController implements TrafficShapingController 

    private static final int DEFAULT_AVG_USED_TOKENS = 0;

    private double count;
    private int grade;

    public DefaultController(double count, int grade) 
        this.count = count;
        this.grade = grade;
    

    @Override
    public boolean canPass(Node node, int acquireCount) 
        return canPass(node, acquireCount, false);
    

    @Override
    public boolean canPass(Node node, int以上是关于Sentinel的基本使用-流量控制及其源码调用分析的主要内容,如果未能解决你的问题,请参考以下文章

阿里sentinel源码解析

#私藏项目实操分享#Alibaba中间件技术系列「Sentinel技术专题」分布式系统的流量防卫兵的基本介绍(入门源码介绍)

7.Sentinel源码分析—Sentinel是怎么和控制台通信的?

阿里开源Sentinel流控框架基本介绍与简单使用

十五:Sentinel规则持久化实战及其源码分析

4.Sentinel源码分析— Sentinel是如何做到降级的?