Sentinel的基本使用-流量控制及其源码调用分析
Posted _微风轻起
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Sentinel的基本使用-流量控制及其源码调用分析相关的知识,希望对你有一定的参考价值。
首先Sentinel的一些基本介绍就不多说了,其是阿里开源的用于流量控制
、熔断降级
的这些高并发下的一些控制。下面我们直接看使用的案例。
首先在项目中我们引入对应的依赖
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.8.3</version>
</dependency>
一、流量控制
1、QPS方式
public class SimpleEntryMain
private static String resourceName = "simpleEntry";
public static void main(String[] args) throws InterruptedException
initFlowRule();
for (int i = 0; i < 10; i++)
Thread t = new Thread(new RunTask());
t.setName("simulate-traffic-Task");
t.start();
Thread.sleep(50);
private static void initFlowRule()
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource(resourceName);
rule1.setCount(2);
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule1.setLimitApp("default");
rules.add(rule1);
FlowRuleManager.loadRules(rules);
static class RunTask implements Runnable
@Override
public void run()
Entry entry = null;
try
entry = SphU.entry(resourceName);
doSomeThing();
catch (BlockException e1)
System.out.println("Block ----------");
finally
if (entry != null)
entry.exit();
public static void doSomeThing()
System.out.println("++++++++++++++++++");
try
Thread.sleep(3000);
catch (InterruptedException e)
// ignore
这个demo首先是通过initFlowRule()
方法设置了规则,然后我们通过多线程来模拟多个调用规则,方法的调用我们简单的模拟花费Thread.sleep(3000)
也就是3秒。然后线程中运行的就是Sentinel
的使用:
Entry entry = null;
try
entry = SphU.entry(resourceName);
doSomeThing();
catch (BlockException e1)
System.out.println("Block ----------");
finally
if (entry != null)
entry.exit();
这里就是try-catch-finally
的标准使用流程,其主要是通过SphU.entry(resourceName)
的调用在里面解析规则的配置校验,如果能通过的话就不会抛出异常,如果不能通过就抛出BlockException
异常。我们上面是直接使用,当我们在SpringBoot
中直接使用的话,其本身的封装可能使用动态代理或者AOP的方式来将这里的doSomeThing
与SphU.entry(resourceName)
的获取做解耦,我们加了注解后就可以直接专注于我们的doSomeThing()
的方法逻辑。
2、demo介绍
上面就是基于QPS
方式来限流,首先是我们设置对应的规则:
private static void initFlowRule()
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource(resourceName);
rule1.setCount(2);
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule1.setLimitApp("default");
rules.add(rule1);
FlowRuleManager.loadRules(rules);
这里我们主要是设置了4个参数:
1、resource
这个是表示当前规则控制的是哪个资源。这个与SphU.entry(resourceName)
是对应的,当你在通过其看能不能获取规则的时候,其就是通过resourceName
来匹配你前面设置的规则的。
2、count
这个是规则的数量现在,例如你如果是基于QPS
的方法的话,这个count
就表示一秒内只能允许两个请求通过。
3、grade
这个是表示你选择的哪种流控规则。例如QPS
、或同时运行的线程数。
/**
* The threshold type of flow control (0: thread count, 1: QPS).
*/
private int grade = RuleConstant.FLOW_GRADE_QPS;
public final class RuleConstant
public static final int FLOW_GRADE_THREAD = 0;
public static final int FLOW_GRADE_QPS = 1;
4、limitApp
这个是表示规则现在的哪个应用的资源,应该是用于规则只作用在对应的app
上面,不过你本身练习或单应用,并不需要主动来设置,因为其在new FlowRule()
的时候就自动设置了:
public FlowRule()
super();
setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
public static final String LIMIT_APP_DEFAULT = "default";
3、线程启动之间Thread.sleep(50)的原因
上面的注意点主要有我们故意在开启10个线程的时候,在它们之间加了Thread.sleep(50)
,如果不加这个可能就没有柱塞的效果,例如如果没有加这个:
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
可以看到没有阻塞,如果我们加了:
++++++++++++++++++
++++++++++++++++++
Block ----------
Block ----------
Block ----------
Block ----------
Block ----------
Block ----------
Block ----------
Block ----------
当然如果有加Thread.sleep(50)
,打印也可能是其他的顺序,这个看CPU
的具体运行。
我们之所以要在线程启动之间加这个运行间隔,是因为Sentinel
本身的计算原理。一个是sentinel
的是否能通过判断
与对应的统计数量+1
中间并不是原子性的。Sentinel
本身的运行时一个链式执行结构,因为其本身不单只有流量控制规则、还是熔断降级等其他的规则,以及一些统计运算,这些都是封装在ProcessorSlot
接口中,然后通过设置next
就这样一个、一个的链上了。
1、判断与统计+1之间只有可见性,而没有原则性
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T>
private AbstractLinkedProcessorSlot<?> next = null;
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable
if (next != null)
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
例如我们的流控节点就是FlowSlot
在处理、熔断降级是DegradeSlot
处理的,然后我们是否能通过的数据就是StatisticSlot
在处理:
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode>
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable
try
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);
..........
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode>
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
可以看到我们的判断是在FlowSlot
中处理的,然后对于节点的计数是在StatisticSlot
中处理的,而这组件是没有同步控制的,
输入计数是使用了LongAdder
实现可见性。
public class MetricBucket
private final LongAdder[] counters;
private LongAdder curThreadNum = new LongAdder();
所以在并发的时候,可能就是多个线程在判断的时候,其他线程也是判断,+1
操作可能还没有运行。
我们可以修改下线程启动的线程的方式:
public static void main(String[] args) throws InterruptedException
// initFlowQpsRule();
initFlowRule();
for (int i = 0; i < 5; i++)
Thread t = new Thread(new RunTask());
t.setName("simulate-traffic-Task");
t.start();
Thread.sleep(100);
for (int i = 0; i < 5; i++)
Thread t = new Thread(new RunTask());
t.setName("simulate-traffic-Task");
t.start();
我们将10个线程拆分为两组,在之间睡眠100ms
:
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
Block ----------
Block ----------
Block ----------
Block ----------
Block ----------
可以看到前面5个并没有是2+3
方式(两个通过、3个阻塞),但后面的5个都被限流了。基于这个分段、并且Sentinel
是滑动窗口的方式我们可以联想到另一个原因,就是窗口分段区间的问题。
4、默认的滑动窗口逻辑。
Sentinel
本身是使用的滑动窗口进行调用计数的,当在调用entry
方法获取资源的时候,其就会计数。这个计数就是通过滑动窗口处理的。其逻辑例如QPS
的话,其滑动的单位就是1000ms
,然后其内部又会将这个1000ms
划分为更多的段,然后再进行数值判断。
例如限流规则使用QPS规则的话,默认是将1秒分为两个区间,也就是每段500ms
也就是0.5秒。例如我们在获取数量判断的时候,如果是在B1
区间(也就是500-1000)的时候,其就会将B0-B1
两个空间的值加起来,而当我们在B3
的时候,Sentinel
就通过当前的时候就能判断这个时候B0
已经失效了,就会将B0
失效,生成新的数据端描叙B2
,判断数量的会就会是B1+B2
。
* B0 B1 B3
* ||_______|_______|_______||___
* 0 500 1000 1500 timestamp
* ^
* time=300
这里在我们开始的时候也就是从第0秒开始。然后我们再修改原来的线程启动间隔,并且我们这里将线程改为2+1+2
的数量(减少线程数量,是为了防止中间打印太多字符不好分析逻辑)。
public static void main(String[] args) throws InterruptedException
initFlowRule();
for (int i = 0; i < 1; i++)
Thread t = new Thread(new RunTask());
t.setName("simulate-traffic-Task");
t.start();
Thread.sleep(500);
for (int i = 0; i < 1; i++)
Thread t = new Thread(new RunTask());
t.setName("simulate-traffic-Task");
t.start();
Thread.sleep(500);
for (int i = 0; i < 2; i++)
Thread t = new Thread(new RunTask());
t.setName("simulate-traffic-Task");
t.start();
Thread.sleep(50);
++++++++++++++++++
++++++++++++++++++
++++++++++++++++++
Block ----------
我们可以看到这个demo的逻辑,是想说明Sentinel
对滑动窗口桶的划分逻辑,Sentinel
默认是将1s
划分为两个桶,也就是每个桶为500ms
,所以我们上面的逻辑就说明了,在B0
获取一个了,然后B1
也能获取一个,但由于B2
是在500-1500
这个区间,由这两个桶计算所得,所以在1000-1500
这里就只能获取一个,阻塞一个。
public class StatisticNode implements Node
/**
* Holds statistics of the recent @code INTERVAL milliseconds. The @code INTERVAL is divided into time spans
* by given @code sampleCount.
*/
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
public static volatile int SAMPLE_COUNT = 2;
public class ArrayMetric implements Metric
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs)
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
public LeapArray(int sampleCount, int intervalInMs)
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
例如在StatisticNode
计算有多少通过的时候就是通过ArrayMetric
里面的桶通过的数量相加:
@Override
public double passQps()
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
public class ArrayMetric implements Metric
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs)
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
.........
@Override
public long pass()
data.currentWindow();
long pass = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list)
pass += window.pass();
return pass;
这里的data.currentWindow();
就是计数当前是在哪个桶,如果还没有就创建,感兴趣可以自己具体去看下这个逻辑。然后data.values()
就是获取当前时间用于数量的桶。
public MetricBucket()
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events)
counters[event.ordinal()] = new LongAdder();
initMinRt();
public enum MetricEvent
/**
* Normal pass.
*/
PASS,
/**
* Normal block.
*/
BLOCK,
EXCEPTION,
SUCCESS,
RT,
/**
* Passed in future quota (pre-occupied, since 1.5.0).
*/
OCCUPIED_PASS
这里就记录了通过、阻拦、异常这些信息。这些内容就能用于当前请求是否能通过的判断逻辑:
public class DefaultController implements TrafficShapingController
private static final int DEFAULT_AVG_USED_TOKENS = 0;
private double count;
private int grade;
public DefaultController(double count, int grade)
this.count = count;
this.grade = grade;
@Override
public boolean canPass(Node node, int acquireCount)
return canPass(node, acquireCount, false);
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized阿里sentinel源码解析
#私藏项目实操分享#Alibaba中间件技术系列「Sentinel技术专题」分布式系统的流量防卫兵的基本介绍(入门源码介绍)