Sentinel的基本使用-热点参数限流
Posted _微风轻起
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Sentinel的基本使用-热点参数限流相关的知识,希望对你有一定的参考价值。
一、热点参数限流介绍
这一篇我们主要来介绍下热点参数限流,这个介绍我们看下官方的描述:
何为热点?热点即经常访问的数据。很多时候我们希望统计某个热点数据中访问频次最高的 Top K 数据,并对其访问进行限制。比如:
商品 ID 为参数,统计一段时间内最常购买的商品 ID 并进行限制
用户 ID 为参数,针对一段时间内频繁访问的用户 ID 进行限制
热点参数限流会统计传入参数中的热点参数,并根据配置的限流阈值与模式,对包含热点参数的资源调用进行限流。热点参数限流可以看做是一种特殊的流量控制,仅对包含热点参数的资源调用生效。
Sentinel 利用 LRU 策略统计最近最常访问的热点参数,结合令牌桶算法来进行参数级别的流控。热点参数限流支持集群模式。
这里我们简单理解下,就是对资源上面的参数进行流量控制,例如我们前面基于QPS
进行限流时使用的获取Entry
方法:
SphU.entry(KEY);
public static Entry entry(String name) throws BlockException
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
可以看到我们只需要传入资源名称就是可以,但我们的热点参数就多了一个概念,就是参数,其的获取是资源
上面对应的参数
:
SphU.entry(Resource_Key, EntryType.IN, 1, ParamArray[arrayIndex]);
public static Entry entry(String name, EntryType trafficType, int batchCount, Object... args)
throws BlockException
return Env.sph.entry(name, trafficType, batchCount, args);
我们主要是两个传入资源以及对应的参数,上面其他的两个参数:EntryType
表示类型是入口流量还是出口流量,另一个batchCount
表示本次获取的资源数量,例如如果我们设置的count
是4,如果batchCount
是2,获取后count
就还只有2。
这个简单理解SphU.entry(Resource_Key, EntryType.IN, 1, ParamArray[arrayIndex])
就是:资源就是获取到流量后需要调用的方法,而参数就是这个方法需要的入参。
二、demo介绍
我们看下规则配置。
1、参数
热点参数规则(ParamFlowRule
)类似于流量控制规则(FlowRule
):
属性 | 说明 | 默认值 |
---|---|---|
resource | 资源名,必填 | |
count | 限流阈值,必填 | |
grade | 限流模式 | QPS 模式 |
durationInSec | 统计窗口时间长度(单位为秒),1.6.0 版本开始支持 | 1s |
controlBehavior | 流控效果(支持快速失败和匀速排队模式),1.6.0 版本开始支持 | 快速失败 |
maxQueueingTimeMs | 最大排队等待时长(仅在匀速排队模式生效),1.6.0 版本开始支持 | 0ms |
paramIdx | 热点参数的索引,必填,对应 SphU.entry(xxx, args) 中的参数索引位置 | |
paramFlowItemList | 参数例外项,可以针对指定的参数值单独设置限流阈值,不受前面 count 阈值的限制。仅支持基本类型和字符串类型 | |
clusterMode | 是否是集群参数流控规则 | false |
clusterConfig | 集群流控相关配置 |
public static void initRule()
ParamFlowRule paramFlowRule = new ParamFlowRule();
paramFlowRule.setResource(Resource_Key);
paramFlowRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
paramFlowRule.setCount(3);
//允许的最大突发请求
// paramFlowRule.setBurstCount(10);
paramFlowRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);
//统计的周期
paramFlowRule.setDurationInSec(1);
paramFlowRule.setParamIdx(0);
List<ParamFlowRule> paramFlowRules = new ArrayList<>();
paramFlowRules.add(paramFlowRule);
ParamFlowRuleManager.loadRules(paramFlowRules);
这里我们可以看到其的参数设置与流量控制是差不多的。但这里还多了一个paramFlowRule.setParamIdx(0)
,这个就是参数的位置,你这条规则是控制方法上面的哪个参数,然后paramFlowRule.setBurstCount(10)
是应对突发流量的(使用的是令牌桶算法,允许突发流量获取到令牌,这个就是设置最大能允许获取到的令牌树),例如如果你一般允许的Count
是3,但如果流量突然增加到16,这个时候能允许超出限制给出10个令牌,但第16就获取不到。
这里我们可以理解资源为一个方法,其有一个参数:
public void doSomething(String name)
我们规则控制的就是第一个参数。
2、运行结果解释
然后我们来看下运行方法:
public class ParamFlowMain
public static String Resource_Key = "paramResource";
//public static String[] ParamArray = new String[]"A","B","C";
public static String ItemParamKey = ParamArray[1];
public static void main(String[] args)
initRule();
for (int i = 0; i < 12; i++)
Entry entry = null;
int arrayIndex = i%3;
try
entry = SphU.entry(Resource_Key, EntryType.IN, 1, ParamArray[arrayIndex]);
System.out.println("param " + ParamArray[arrayIndex] + " ---------doSomething");
catch (BlockException blockException)
System.out.println("param " + ParamArray[arrayIndex] + " ---------Blocked");
finally
if (Objects.nonNull(entry))
entry.exit();
public static void initRule()
............
这里我们的一些方法,ParamArray
数组可以理解为传给doSomething(String name)
方法的入参name
的3个不同的值。
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------Blocked
param B ---------Blocked
param C ---------Blocked
我们可以看到3个参数每种只能通过3
次。同时这里又引申到另一个问题,我如果想设置参数的某个值为另外的count
呢,这个时候我们需要增加另一部分逻辑:
public static void initRule()
ParamFlowRule paramFlowRule = new ParamFlowRule();
paramFlowRule.setResource(Resource_Key);
paramFlowRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
paramFlowRule.setCount(3);
//允许的最大突发请求
// paramFlowRule.setBurstCount(10);
paramFlowRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);
//统计的周期
paramFlowRule.setDurationInSec(1);
paramFlowRule.setParamIdx(0);
//例外的参数配置
ParamFlowItem paramFlowItem = new ParamFlowItem();
paramFlowItem.setCount(4);
//参数类型
paramFlowItem.setClassType(String.class.getTypeName());
//具体的参数值
paramFlowItem.setObject(ItemParamKey);
List<ParamFlowItem> paramFlowItemList = new ArrayList<>();
paramFlowItemList.add(paramFlowItem);
paramFlowRule.setParamFlowItemList(paramFlowItemList);
List<ParamFlowRule> paramFlowRules = new ArrayList<>();
paramFlowRules.add(paramFlowRule);
ParamFlowRuleManager.loadRules(paramFlowRules);
也就是在ParamFlowRule
增加ParamFlowItem
,用它来控制特殊的值的控制。这里我们将ParamArray[1]
也就是B
控制的流量为4:
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------Blocked
param B ---------doSomething
param C ---------Blocked
我们可以看到这次param B
第4次是能通过的。
三、源码简单梳理
下面我们来简单梳理下源码里面的处理:
1、ParamFlowSlot
处理热点参数的Slot
是ParamFlowSlot
:
public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode>
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName()))
fireEntry(context, resourceWrapper, node, count, prioritized, args);
return;
checkFlow(resourceWrapper, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
其首先是判断当前资源有没有设置对应的规则,如果没有就直接放行运行fireEntry
,而不会checkFlow
。
public static boolean hasRules(String resourceName)
List<ParamFlowRule> rules = PARAM_FLOW_RULES.get(resourceName);
return rules != null && !rules.isEmpty();
这个目前就有我们设置的ParamFlowRule
,然后就会check
:
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException
if (args == null)
return;
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName()))
return;
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
for (ParamFlowRule rule : rules)
applyRealParamIdx(rule, args.length);
// Initialize the parameter metrics.
ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args))
String triggeredParam = "";
if (args.length > rule.getParamIdx())
Object value = args[rule.getParamIdx()];
triggeredParam = String.valueOf(value);
throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
主要就是对ParamFlowRule
的判断,不通过就抛出ParamFlowException
(BlockException
的子类)
2、ParamFlowChecker
public final class ParamFlowChecker
public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
Object... args)
if (args == null)
return true;
int paramIdx = rule.getParamIdx();
if (args.length <= paramIdx)
return true;
// Get parameter value.
Object value = args[paramIdx];
..........
return passLocalCheck(resourceWrapper, rule, count, value);
static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value)
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS)
if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)
return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
else
return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
if (exclusionItems.contains(value))
int itemThreshold = rule.getParsedHotItems().get(value);
return ++threadCount <= itemThreshold;
long threshold = (long)rule.getCount();
return ++threadCount <= threshold;
return true;
再之后这里的判断就会看你是哪种控制行为,进行不通的判断,我们当前是走的passDefaultLocalCheck
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value)
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
if (tokenCounters == null || timeCounters == null)
return true;
// Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value))
tokenCount = rule.getParsedHotItems().get(value);
if (tokenCount == 0)
return false;
long maxCount = tokenCount + rule.getBurstCount();
if (acquireCount > maxCount)
return false;
while (true)
......
这里我们注意两个计算tokenCounters
、timeCounters
,tokenCounters
记录是参数当前还能获取到的令牌树,timeCounters
是记录时间段的开始时间。然后可以看到long maxCount = tokenCount + rule.getBurstCount();
,最大许可数是使用到了BurstCount
。
然后下面就是具体的计算统计。这里我们简单梳理其中一个情况吧:
while (true)
long currentTime = TimeUtil.currentTimeMillis();
AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
if (lastAddTokenTime == null)
// Token never added, just replenish the tokens and consume @code acquireCount immediately.
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
long passTime = currentTime - lastAddTokenTime.get();
// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
if (passTime > rule.getDurationInSec() * 1000)
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null)
// Might not be accurate here.
lastAddTokenTime.set(currentTime);
return true;
else
long restQps = oldQps.get();
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);
if (newQps < 0)
return false;
if (oldQps.compareAndSet(restQps, newQps))
lastAddTokenTime.set(currentTime);
return true;
Thread.yield();
else
AtomicLon以上是关于Sentinel的基本使用-热点参数限流的主要内容,如果未能解决你的问题,请参考以下文章
手把手带你领略双十一背后的核心技术Sentinel之热点参数限流
SpringCloudSpring Cloud Alibaba 之 Sentinel热点参数限流与系统自适应限流(三十二)
SpringCloudSpring Cloud Alibaba 之 Sentinel热点参数限流与系统自适应限流(三十二)