Sentinel限流原理(基于Sentinel1.8.1),限流熔断热点参数限流授权实现原理

Posted Leo Han

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Sentinel限流原理(基于Sentinel1.8.1),限流熔断热点参数限流授权实现原理相关的知识,希望对你有一定的参考价值。

在 Sentinel 里面,所有的资源都对应一个资源名称以及一个 Entry。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 API 显式创建;每一个 Entry 创建的时候,同时也会创建一系列功能插槽(slot chain)。这些插槽有不同的职责,例如:

  • NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
  • ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
  • StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;
  • FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
  • AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;
  • DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级;
  • SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;
    总体的框架如下:

    上面内容来自Sentinel官网给出的内容,接下来我们看看Sentinel源码层面是怎么实现逻辑的?
    一般Sentinel限流都是通过:
Entry entry = SphU.entry('entryName');

这个Entry相当于是获取到了一个令牌,如果能够获取到这个令牌,表示可以通过,能够访问资源。
在Sentinel中有几个比较重要的概念:

  • Entry 代表的是一个令牌,如果能够通过,则获取到entry不为空
  • Context 代表的则是一次请求的上下文
  • Node 代表的则是一次请求、一个资源、一个节点集群的请求调用信息记录

当执行SphU.entry的时候,会访问:

// SphU.java
public static Entry entry(String name) throws BlockException 
        return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
    
	// CtSph.java
    StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args);
    
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException 
        return entryWithPriority(resourceWrapper, count, false, args);
    
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException 
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) 
            // The @link NullContext indicates that the amount of context has exceeded the threshold,
            // so here init the entry only. No rule checking will be done.
            return new CtEntry(resourceWrapper, null, context);
        

        if (context == null) 
            // Using default context.
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        

        // Global switch is close, no rule checking will do.
        if (!Constants.ON) 
            return new CtEntry(resourceWrapper, null, context);
        

        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
        if (chain == null) 
            return new CtEntry(resourceWrapper, null, context);
        

        Entry e = new CtEntry(resourceWrapper, chain, context);
        try 
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
         catch (BlockException e1) 
            e.exit(count, args);
            throw e1;
         catch (Throwable e1) 
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        
        return e;
    

这里

  • 首先会获取到当前线程上线文的执行环境Context
  • 然后获取到ProcessorSlot执行链,执行ProcessorSlot.entry
  • 如果上一步执行成功,表示能够访问,返回CtEntry,否则抛出异常
    另外需要注意的是,对于资源,在Sentinel抽象成了ResourceWrapper,并重写了equals和hashCode方法
@Override
    public int hashCode() 
        return getName().hashCode();
    

    /**
     * Only @link #getName() is considered.
     */
    @Override
    public boolean equals(Object obj) 
        if (obj instanceof ResourceWrapper) 
            ResourceWrapper rw = (ResourceWrapper)obj;
            return rw.getName().equals(getName());
        
        return false;
    

只要资源的名称一样,这就是同一个资源
我们首先来看下获取Context:

public static Context getContext() 
        return contextHolder.get();

这里的contextHolder是一个ThreadLocal<Context>变量,初始的时候肯定是空的,
所以开始肯定会走context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);逻辑:

protected static Context trueEnter(String name, String origin) 
        Context context = contextHolder.get();
        if (context == null) 
            Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
            DefaultNode node = localCacheNameMap.get(name);
            if (node == null) 
                if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) 
                    setNullContext();
                    return NULL_CONTEXT;
                 else 
                    LOCK.lock();
                    try 
                        node = contextNameNodeMap.get(name);
                        if (node == null) 
                            if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) 
                                setNullContext();
                                return NULL_CONTEXT;
                             else 
                                node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                                Constants.ROOT.addChild(node);
                                Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                                newMap.putAll(contextNameNodeMap);
                                newMap.put(name, node);
                                contextNameNodeMap = newMap;
                            
                        
                     finally 
                        LOCK.unlock();
                    
                
            
            context = new Context(node, name);
            context.setOrigin(origin);
            contextHolder.set(context);
        
        return context;
    

这里需要注意的是在ContextUtil代码加载的时候会执行一段静态代码:

private static void initDefaultContext() 
        String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;
        EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);
        Constants.ROOT.addChild(node);
        contextNameNodeMap.put(defaultContextName, node);
    

而这里的参数nameConstants.CONTEXT_DEFAULT_NAME,所以,开始的时候即使Context为null,node = contextNameNodeMap.get(name);也不为null,是一个EntranceNode, 即默认情况下,每个Context初始的时候node都为EntranceNode.

这样,就得到了Context。

接下来就是获取执行链路ProcessorSlot:

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) 
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) 
            synchronized (LOCK) 
                chain = chainMap.get(resourceWrapper);
                if (chain == null) 
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) 
                        return null;
                    
                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                
            
        
        return chain;
    
public static ProcessorSlotChain newSlotChain() 
        if (slotChainBuilder != null) 
            return slotChainBuilder.build();
        

        // Resolve the slot chain builder SPI.
        slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();

        if (slotChainBuilder == null) 
            // Should not go through here.
            RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
            slotChainBuilder = new DefaultSlotChainBuilder();
         else 
            RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: ",
                slotChainBuilder.getClass().getCanonicalName());
        
        return slotChainBuilder.build();
    

这里也是通过Spi机制获取,在META-INF.services下面,有这个几个文件SPI会用到,

这里首先会获取一个SlotChainBuilder,默认获取到的就是DefaultSlotChainBuilder,
DefaultSlotChainBuilder会加载com.alibaba.csp.sentinel.slotchain.ProcessorSlot里面的类,Sentinel中默认提供了如下实现:

# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

这里加载完之后,会根据ProcessorSlot的注解的order属性进行从大到小的排序,默认几个实现的排序大小大家可对下:

public static final int ORDER_NODE_SELECTOR_SLOT = -10000;
    public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000;
    public static final int ORDER_LOG_SLOT = -8000;
    public static final int ORDER_STATISTIC_SLOT = -7000;
    public static final int ORDER_AUTHORITY_SLOT = -6000;
    public static final int ORDER_SYSTEM_SLOT = -5000;
    public static final int ORDER_FLOW_SLOT = -2000;
    public static final int ORDER_DEGRADE_SLOT = -1000;

然后这里有一点需要注意,Sentinel中,每个资源会对应一组ProcessorSlot,在这些ProcessorSlot有很多类实例变量,只会记录该资源的信息,,而有些则是全局的,属于整个节点的

NodeSelectorSlot

接下来开始执行chain.entry(context, resourceWrapper, null, count, prioritized, args);,
这里的chain是一个DefaultProcessorSlotChain,这个里面只有了上面加载的ProcessorSlot的链表,最终会从第一个ProcessorSlot往后执行,首选in执行的是NodeSelectorSlot:

public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable 
        DefaultNode node = map.get(context.getName());
        if (node == null) 
            synchronized (this) 
                node = map.get(context.getName());
                if (node == null) 
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    ((DefaultNode) context.getLastNode()).addChild(node);
                

            
        
        context.setCurNode(node);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    

这里第一次的时候node=null,新建了一个DefaultNode,注意,这里的NodeSelectorSlot不是一个单例,而是每个资源都有一个
然后往后面传递执行的时候,传递的是生成的这个node。另外这里context.getName(),如果没有特别执行,每个context.getName()返回的都是Constants.CONTEXT_DEFAULT_NAME.

ClusterBuilderSlot

ClusterBuilderSlot主要逻辑如下:

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args)
        throws Throwable 
        if (clusterNode == null) 
            synchronized (lock) 
                if (clusterNode == null) 
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                    HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);
                    clusterNodeMap = newMap;
                
            
        
        node.setClusterNode(clusterNode);
        if (!"".equals(context.getOrigin())) 
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    

同样这里的clusterNode也是该资源全局一个。接着完后执行,传递的仍是NodeSelectorSlot中的DefaultNode

StatisticSlot

StatisticSlot的作用是记录每个资源的请求情况。

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable 
        try 
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
            node.increaseThreadNum();
            node.addPassRequest(count);

            if (resourceWrapper.getEntryType() == EntryType.IN) 
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) 
                handler.onPass(context, resourceWrapper, node, count, args);
            
         catch (PriorityWaitException ex) 
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) context.getCurEntry().getOriginNode().increaseThreadNum();
            

            if (resourceWrapper.getEntryType() == EntryType.IN) 
                Constants.ENTRY_NODE.increaseThreadNum();
            
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()

以上是关于Sentinel限流原理(基于Sentinel1.8.1),限流熔断热点参数限流授权实现原理的主要内容,如果未能解决你的问题,请参考以下文章

sentinel 限流熔断神器详细介绍

sentinel 限流熔断神器详细介绍

sentinel1.8自定义错误信息

sentinel1.8自定义错误信息

SpringCoudAlibaba之Sentinel下-底层篇

SpringCoudAlibaba之Sentinel下-底层篇