Sentinel的基本使用-热点参数限流

Posted _微风轻起

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Sentinel的基本使用-热点参数限流相关的知识,希望对你有一定的参考价值。

一、热点参数限流介绍

​ 这一篇我们主要来介绍下热点参数限流,这个介绍我们看下官方的描述:

何为热点?热点即经常访问的数据。很多时候我们希望统计某个热点数据中访问频次最高的 Top K 数据,并对其访问进行限制。比如:

商品 ID 为参数,统计一段时间内最常购买的商品 ID 并进行限制
用户 ID 为参数,针对一段时间内频繁访问的用户 ID 进行限制
热点参数限流会统计传入参数中的热点参数,并根据配置的限流阈值与模式,对包含热点参数的资源调用进行限流。热点参数限流可以看做是一种特殊的流量控制,仅对包含热点参数的资源调用生效。

Sentinel 利用 LRU 策略统计最近最常访问的热点参数,结合令牌桶算法来进行参数级别的流控。热点参数限流支持集群模式。

​ 这里我们简单理解下,就是对资源上面的参数进行流量控制,例如我们前面基于QPS进行限流时使用的获取Entry方法:

SphU.entry(KEY);
public static Entry entry(String name) throws BlockException 
    return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);

​ 可以看到我们只需要传入资源名称就是可以,但我们的热点参数就多了一个概念,就是参数,其的获取是资源上面对应的参数

SphU.entry(Resource_Key, EntryType.IN, 1, ParamArray[arrayIndex]);
public static Entry entry(String name, EntryType trafficType, int batchCount, Object... args)
    throws BlockException 
    return Env.sph.entry(name, trafficType, batchCount, args);

​ 我们主要是两个传入资源以及对应的参数,上面其他的两个参数:EntryType表示类型是入口流量还是出口流量,另一个batchCount表示本次获取的资源数量,例如如果我们设置的count是4,如果batchCount是2,获取后count就还只有2。

​ 这个简单理解SphU.entry(Resource_Key, EntryType.IN, 1, ParamArray[arrayIndex])就是:资源就是获取到流量后需要调用的方法,而参数就是这个方法需要的入参。

二、demo介绍

我们看下规则配置。

1、参数

热点参数规则(ParamFlowRule)类似于流量控制规则(FlowRule):

属性说明默认值
resource资源名,必填
count限流阈值,必填
grade限流模式QPS 模式
durationInSec统计窗口时间长度(单位为秒),1.6.0 版本开始支持1s
controlBehavior流控效果(支持快速失败和匀速排队模式),1.6.0 版本开始支持快速失败
maxQueueingTimeMs最大排队等待时长(仅在匀速排队模式生效),1.6.0 版本开始支持0ms
paramIdx热点参数的索引,必填,对应 SphU.entry(xxx, args) 中的参数索引位置
paramFlowItemList参数例外项,可以针对指定的参数值单独设置限流阈值,不受前面 count 阈值的限制。仅支持基本类型和字符串类型
clusterMode是否是集群参数流控规则false
clusterConfig集群流控相关配置
public static void initRule()
        ParamFlowRule paramFlowRule = new ParamFlowRule();
        paramFlowRule.setResource(Resource_Key);
        paramFlowRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        paramFlowRule.setCount(3);
        //允许的最大突发请求
//        paramFlowRule.setBurstCount(10);
        paramFlowRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);
        //统计的周期
        paramFlowRule.setDurationInSec(1);
        paramFlowRule.setParamIdx(0);
		List<ParamFlowRule> paramFlowRules = new ArrayList<>();
        paramFlowRules.add(paramFlowRule);
        ParamFlowRuleManager.loadRules(paramFlowRules);
    

​ 这里我们可以看到其的参数设置与流量控制是差不多的。但这里还多了一个paramFlowRule.setParamIdx(0),这个就是参数的位置,你这条规则是控制方法上面的哪个参数,然后paramFlowRule.setBurstCount(10)是应对突发流量的(使用的是令牌桶算法,允许突发流量获取到令牌,这个就是设置最大能允许获取到的令牌树),例如如果你一般允许的Count是3,但如果流量突然增加到16,这个时候能允许超出限制给出10个令牌,但第16就获取不到。

​ 这里我们可以理解资源为一个方法,其有一个参数:

public void doSomething(String name)

​ 我们规则控制的就是第一个参数。

2、运行结果解释

然后我们来看下运行方法:

public class ParamFlowMain 

    public static String Resource_Key = "paramResource";

    //public static String[] ParamArray = new String[]"A","B","C";

    public static String ItemParamKey = ParamArray[1];

    public static void main(String[] args) 
        initRule();
        for (int i = 0; i < 12; i++) 
            Entry entry = null;
            int arrayIndex = i%3;
            try 
                entry = SphU.entry(Resource_Key, EntryType.IN, 1, ParamArray[arrayIndex]);
                System.out.println("param " + ParamArray[arrayIndex] + " ---------doSomething");
             catch (BlockException blockException) 
                System.out.println("param " + ParamArray[arrayIndex] + " ---------Blocked");
             finally 
                if (Objects.nonNull(entry)) 
                    entry.exit();
                
            
        
    

    public static void initRule()
       ............
    


​ 这里我们的一些方法,ParamArray数组可以理解为传给doSomething(String name)方法的入参name的3个不同的值。

param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------Blocked
param B ---------Blocked
param C ---------Blocked

​ 我们可以看到3个参数每种只能通过3次。同时这里又引申到另一个问题,我如果想设置参数的某个值为另外的count呢,这个时候我们需要增加另一部分逻辑:

public static void initRule()
        ParamFlowRule paramFlowRule = new ParamFlowRule();
        paramFlowRule.setResource(Resource_Key);
        paramFlowRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        paramFlowRule.setCount(3);
        //允许的最大突发请求
//        paramFlowRule.setBurstCount(10);
        paramFlowRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);
        //统计的周期
        paramFlowRule.setDurationInSec(1);

        paramFlowRule.setParamIdx(0);

        //例外的参数配置
        ParamFlowItem paramFlowItem = new ParamFlowItem();
        paramFlowItem.setCount(4);
        //参数类型
        paramFlowItem.setClassType(String.class.getTypeName());
        //具体的参数值
        paramFlowItem.setObject(ItemParamKey);
        List<ParamFlowItem> paramFlowItemList = new ArrayList<>();
        paramFlowItemList.add(paramFlowItem);
        paramFlowRule.setParamFlowItemList(paramFlowItemList);

        List<ParamFlowRule> paramFlowRules = new ArrayList<>();
        paramFlowRules.add(paramFlowRule);
        ParamFlowRuleManager.loadRules(paramFlowRules);
    

​ 也就是在ParamFlowRule增加ParamFlowItem,用它来控制特殊的值的控制。这里我们将ParamArray[1]也就是B控制的流量为4:

param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------doSomething
param B ---------doSomething
param C ---------doSomething
param A ---------Blocked
param B ---------doSomething
param C ---------Blocked

​ 我们可以看到这次param B第4次是能通过的。

三、源码简单梳理

​ 下面我们来简单梳理下源码里面的处理:

1、ParamFlowSlot

​ 处理热点参数的SlotParamFlowSlot

public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> 

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable 
        if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) 
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
            return;
        

        checkFlow(resourceWrapper, count, args);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    

​ 其首先是判断当前资源有没有设置对应的规则,如果没有就直接放行运行fireEntry,而不会checkFlow

public static boolean hasRules(String resourceName) 
    List<ParamFlowRule> rules = PARAM_FLOW_RULES.get(resourceName);
    return rules != null && !rules.isEmpty();

​ 这个目前就有我们设置的ParamFlowRule,然后就会check

void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException 
    if (args == null) 
        return;
    
    if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) 
        return;
    
    List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());

    for (ParamFlowRule rule : rules) 
        applyRealParamIdx(rule, args.length);

        // Initialize the parameter metrics.
        ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);

        if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) 
            String triggeredParam = "";
            if (args.length > rule.getParamIdx()) 
                Object value = args[rule.getParamIdx()];
                triggeredParam = String.valueOf(value);
            
            throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
        
    

​ 主要就是对ParamFlowRule的判断,不通过就抛出ParamFlowException(BlockException的子类)

2、ParamFlowChecker

public final class ParamFlowChecker 

    public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
                             Object... args) 
        if (args == null) 
            return true;
        
        int paramIdx = rule.getParamIdx();
        if (args.length <= paramIdx) 
            return true;
        

        // Get parameter value.
        Object value = args[paramIdx];
		..........

        return passLocalCheck(resourceWrapper, rule, count, value);
    
static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
                                    Object value) 
    if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) 
        if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) 
            return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
         else 
            return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
        
     else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) 
        Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
        long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
        if (exclusionItems.contains(value)) 
            int itemThreshold = rule.getParsedHotItems().get(value);
            return ++threadCount <= itemThreshold;
        
        long threshold = (long)rule.getCount();
        return ++threadCount <= threshold;
    

    return true;

​ 再之后这里的判断就会看你是哪种控制行为,进行不通的判断,我们当前是走的passDefaultLocalCheck

static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
                                     Object value) 
    ParameterMetric metric = getParameterMetric(resourceWrapper);
    CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
    CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);

    if (tokenCounters == null || timeCounters == null) 
        return true;
    

    // Calculate max token count (threshold)
    Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
    long tokenCount = (long)rule.getCount();
    if (exclusionItems.contains(value)) 
        tokenCount = rule.getParsedHotItems().get(value);
    

    if (tokenCount == 0) 
        return false;
    

    long maxCount = tokenCount + rule.getBurstCount();
    if (acquireCount > maxCount) 
        return false;
    

    while (true) 
        ......	
    

​ 这里我们注意两个计算tokenCounterstimeCounterstokenCounters记录是参数当前还能获取到的令牌树,timeCounters是记录时间段的开始时间。然后可以看到long maxCount = tokenCount + rule.getBurstCount();,最大许可数是使用到了BurstCount

然后下面就是具体的计算统计。这里我们简单梳理其中一个情况吧:

while (true) 
    long currentTime = TimeUtil.currentTimeMillis();

    AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
 	if (lastAddTokenTime == null) 
                // Token never added, just replenish the tokens and consume @code acquireCount immediately.
        tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
        return true;
      
    long passTime = currentTime - lastAddTokenTime.get();
    // A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
    if (passTime > rule.getDurationInSec() * 1000) 
        AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
        if (oldQps == null) 
            // Might not be accurate here.
            lastAddTokenTime.set(currentTime);
            return true;
         else 
            long restQps = oldQps.get();
            long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
            long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
                : (restQps + toAddCount - acquireCount);

            if (newQps < 0) 
                return false;
            
            if (oldQps.compareAndSet(restQps, newQps)) 
                lastAddTokenTime.set(currentTime);
                return true;
            
            Thread.yield();
        
     else 
        AtomicLon

以上是关于Sentinel的基本使用-热点参数限流的主要内容,如果未能解决你的问题,请参考以下文章

Sentinel的基本使用-热点参数限流

手把手带你领略双十一背后的核心技术Sentinel之热点参数限流

使用Sentinel实现热点参数限流

SpringCloudSpring Cloud Alibaba 之 Sentinel热点参数限流与系统自适应限流(三十二)

SpringCloudSpring Cloud Alibaba 之 Sentinel热点参数限流与系统自适应限流(三十二)

Sentinel限流原理(基于Sentinel1.8.1),限流熔断热点参数限流授权实现原理