Dubbo限流源码分析

Posted codeImport

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo限流源码分析相关的知识,希望对你有一定的参考价值。

Dubbo的配置丰富,功能强大,但是网上的教程多为复制粘贴,鱼龙混杂。本着不轻信,不盲从,眼见为实的态度,本系列教程将从源代码的角度分析各配置项的作用。

1.Dubbo限流该怎么设置

dubbo有多种限流方式,可以使用以下参数进行多维度的限流:

1.accepts:服务端最大可接受连接数,可以理解为可以接受的最大消费者数;2.connections:每个Reference开启的连接数;3.actives:消费端控制每个接口的最大并发数;4.executes:服务端控制每个接口的最大并发数;

2.accepts

2.1.accepts是什么?

Provider配置最大可接受连接数,这是项目级别设置。

比如一个Provider设置了accepts=2,该Provider3个消费者分别为C1,C2,C3。

假如这3个消费者的启动顺序为C1,C2,C3,则C3会无法启动,因为服务已经达到了

最大连接数限制;

2.2.accepts该如何配置? 

dubbo.provider.accepts=3

2.3.accepts源码分析

在消费者端启动时,会生成一条Netty连接,服务端此时会判断服务端接受的连接数是否已经大于accepts,如果大于,则会拒绝该连接。

    //此代码在AbstractServer中    public void connected(Channel ch) throws RemotingException {        if (!this.isClosing() && !this.isClosed()) {           Collection<Channel> channels = this.getChannels();           //判断服务端连接数           if (this.accepts > 0 && channels.size() > this.accepts) {               logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + this.accepts);               ch.close();           } else {               super.connected(ch); }        } else {           logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");           ch.close(); } }   

3.connections

3.1.connectons是什么?

每个Reference开启的长连接数,默认是0,表示所有的Reference共享同一条连接;如果大于0,则单独为此Reference设置connections条长连接。

比如同一个项目有3个reference: 

@Reference(connections=3) HelloService helloService; 

@Reference TestService testService; 

@Reference FooService fooService; 

则该项目会生成4条连接,其中helloService有3条,testService与fooService共用一条

3.2.connections该如何使用

@Reference(connections=3) 

3.3.connections源码分析

在Reference获取ExchangeClient的时候,会判断Reference是否设置了connections参数,如果是则生成相应的ExchangeClient(每个ExchangeClient包含一条连接)

    //该段代码在DubboProtocol中    //该方法会在进行refer的时候调用    private ExchangeClient[] getClients(URL url) { // whether to share connection        boolean useShareConnect = false;        int connections = url.getParameter(CONNECTIONS_KEY, 0);        List<ReferenceCountExchangeClient> shareClients = null;        // if not configured, connection is shared, otherwise, one connection for one service        if (connections == 0) { useShareConnect = true;            //如果connections为0,则获取共享的连接,共享的连接数为1            String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);            shareClients = getSharedClient(url, connections);        }        ExchangeClient[] clients = new ExchangeClient[connections];        for (int i = 0; i < clients.length; i++) {            if (useShareConnect) { clients[i] = shareClients.get(i);            } else { //如果设置了connections则新生成足够的连接 clients[i] = initClient(url);            } }        return clients; }

4.actives

4.1.actives是什么?

Consumer端每个接口的最大并发数,默认是0,如果是0则没有限制。 

4.2.actives该如何使用? 

@Reference(actives = 3)

4.3.actives源码分析 

如果actives大于0,则在Consumer端调用链会加入ActiveLimitFilter过滤器,每次调用前都会判断该接口是否超出了最大并发数,如果超过会等待timeout时间,超时会抛出异常。

    @Override    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {        URL url = invoker.getUrl();        String methodName = invocation.getMethodName();        int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);        final RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());        if (!RpcStatus.beginCount(url, methodName, max)) {            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), TIMEOUT_KEY, 0);            long start = System.currentTimeMillis();            long remain = timeout;            synchronized (rpcStatus) {                while (!RpcStatus.beginCount(url, methodName, max)) {                    try { rpcStatus.wait(remain);                    } catch (InterruptedException e) { // ignore }                    long elapsed = System.currentTimeMillis() - start;                    remain = timeout - elapsed;                    if (remain <= 0) {                        throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION, "Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + rpcStatus.getActive() + ". max concurrent invoke limit: " + max); } } } }
        invocation.setAttachment(ACTIVELIMIT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
        return invoker.invoke(invocation); }

5.executes

5.1.executes是什么?

Provider端每个接口的最大并发数,默认是0,如果是0则没有限制。

5.2.executes如何使用?

@Service(executes = 3)

5.3.executes源码分析 

如果executes大于0,则在Provider端调用链会加入ExecuteLimitFilter过滤器,每次Provider接到请求,都会将该接口的并发数+1并判断是否大于executes,如果是则直接抛出RpcException。

@Activate(group = CommonConstants.PROVIDER, value = EXECUTES_KEY)public class ExecuteLimitFilter extends ListenableFilter {    private static final String EXECUTELIMIT_FILTER_START_TIME = "execugtelimit_filter_start_time";    public ExecuteLimitFilter() {        super.listener = new ExecuteLimitListener();    }    @Override    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {        URL url = invoker.getUrl();        String methodName = invocation.getMethodName();        int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);        //Dubbo会给每个接口维护一个当时的并发调用数,如果executes大于0,则每次都会判断并发数是否超出了限制        if (!RpcStatus.beginCount(url, methodName, max)) {            throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION, "Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited."); }
invocation.setAttachment(EXECUTELIMIT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));        try { return invoker.invoke(invocation);        } catch (Throwable t) {            if (t instanceof RuntimeException) {                throw (RuntimeException) t; } else { throw new RpcException("unexpected exception when ExecuteLimitFilter", t); } } }

6.数据流程图

如果对上面提到的概念比较模糊,可以看下面这张流程图。


References

[1] 并发控制: http://dubbo.apache.org/zh-cn/docs/user/demos/concurrency-control.html
[2] 连接控制: http://dubbo.apache.org/zh-cn/docs/user/demos/config-connections.html

近期热文

Dubbo配置参数源码解析-group



以上是关于Dubbo限流源码分析的主要内容,如果未能解决你的问题,请参考以下文章

源码分析Dubbo tps过滤器器实现原理

源码分析 Sentinel 之 Dubbo 适配原理

源码分析 Sentinel 之 Dubbo 适配原理

限流降级方案

dubbo service启动流程源码分析

6.Sentinel源码分析—Sentinel是如何动态加载配置限流的?