Dubbo限流源码分析
Posted codeImport
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo限流源码分析相关的知识,希望对你有一定的参考价值。
Dubbo的配置丰富,功能强大,但是网上的教程多为复制粘贴,鱼龙混杂。本着不轻信,不盲从,眼见为实的态度,本系列教程将从源代码的角度分析各配置项的作用。
1.Dubbo限流该怎么设置
dubbo有多种限流方式,可以使用以下参数进行多维度的限流:
1.accepts:服务端最大可接受连接数,可以理解为可以接受的最大消费者数;2.connections:每个Reference开启的连接数;3.actives:消费端控制每个接口的最大并发数;4.executes:服务端控制每个接口的最大并发数;
2.accepts
2.1.accepts是什么?
Provider配置最大可接受连接数,这是项目级别设置。
比如一个Provider设置了accepts=2,该Provider3个消费者分别为C1,C2,C3。
假如这3个消费者的启动顺序为C1,C2,C3,则C3会无法启动,因为服务已经达到了
最大连接数限制;
2.2.accepts该如何配置?
dubbo.provider.accepts=3
2.3.accepts源码分析
在消费者端启动时,会生成一条Netty连接,服务端此时会判断服务端接受的连接数是否已经大于accepts,如果大于,则会拒绝该连接。
//此代码在AbstractServer中
public void connected(Channel ch) throws RemotingException {
if (!this.isClosing() && !this.isClosed()) {
Collection<Channel> channels = this.getChannels();
//判断服务端连接数
if (this.accepts > 0 && channels.size() > this.accepts) {
logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + this.accepts);
ch.close();
} else {
super.connected(ch);
}
} else {
logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
ch.close();
}
}
3.connections
3.1.connectons是什么?
每个Reference开启的长连接数,默认是0,表示所有的Reference共享同一条连接;如果大于0,则单独为此Reference设置connections条长连接。
比如同一个项目有3个reference:
@Reference(connections=3) HelloService helloService;
@Reference TestService testService;
@Reference FooService fooService;
则该项目会生成4条连接,其中helloService有3条,testService与fooService共用一条
3.2.connections该如何使用
@Reference(connections=3)
3.3.connections源码分析
在Reference获取ExchangeClient的时候,会判断Reference是否设置了connections参数,如果是则生成相应的ExchangeClient(每个ExchangeClient包含一条连接)
//该段代码在DubboProtocol中
//该方法会在进行refer的时候调用
private ExchangeClient[] getClients(URL url) {
// whether to share connection
boolean useShareConnect = false;
int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
useShareConnect = true;
//如果connections为0,则获取共享的连接,共享的连接数为1
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
shareClients = getSharedClient(url, connections);
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
clients[i] = shareClients.get(i);
} else {
//如果设置了connections则新生成足够的连接
clients[i] = initClient(url);
}
}
return clients;
}
4.actives
4.1.actives是什么?
Consumer端每个接口的最大并发数,默认是0,如果是0则没有限制。
4.2.actives该如何使用?
@Reference(actives = 3)
4.3.actives源码分析
如果actives大于0,则在Consumer端调用链会加入ActiveLimitFilter过滤器,每次调用前都会判断该接口是否超出了最大并发数,如果超过会等待timeout时间,超时会抛出异常。
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);
final RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
if (!RpcStatus.beginCount(url, methodName, max)) {
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
synchronized (rpcStatus) {
while (!RpcStatus.beginCount(url, methodName, max)) {
try {
rpcStatus.wait(remain);
} catch (InterruptedException e) {
// ignore
}
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {
throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,
"Waiting concurrent invoke timeout in client-side for service: " +
invoker.getInterface().getName() + ", method: " + invocation.getMethodName() +
", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " +
rpcStatus.getActive() + ". max concurrent invoke limit: " + max);
}
}
}
}
invocation.setAttachment(ACTIVELIMIT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
return invoker.invoke(invocation);
}
5.executes
5.1.executes是什么?
Provider端每个接口的最大并发数,默认是0,如果是0则没有限制。
5.2.executes如何使用?
@Service(executes = 3)
5.3.executes源码分析
如果executes大于0,则在Provider端调用链会加入ExecuteLimitFilter过滤器,每次Provider接到请求,都会将该接口的并发数+1并判断是否大于executes,如果是则直接抛出RpcException。
(group = CommonConstants.PROVIDER, value = EXECUTES_KEY)
public class ExecuteLimitFilter extends ListenableFilter {
private static final String EXECUTELIMIT_FILTER_START_TIME = "execugtelimit_filter_start_time";
public ExecuteLimitFilter() {
super.listener = new ExecuteLimitListener();
}
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);
//Dubbo会给每个接口维护一个当时的并发调用数,如果executes大于0,则每次都会判断并发数是否超出了限制
if (!RpcStatus.beginCount(url, methodName, max)) {
throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,
"Failed to invoke method " + invocation.getMethodName() + " in provider " +
url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
"\" /> limited.");
}
invocation.setAttachment(EXECUTELIMIT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
try {
return invoker.invoke(invocation);
} catch (Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
}
}
6.数据流程图
如果对上面提到的概念比较模糊,可以看下面这张流程图。
References
[1]
并发控制: http://dubbo.apache.org/zh-cn/docs/user/demos/concurrency-control.html[2]
连接控制: http://dubbo.apache.org/zh-cn/docs/user/demos/config-connections.html
近期热文
Dubbo配置参数源码解析-group
以上是关于Dubbo限流源码分析的主要内容,如果未能解决你的问题,请参考以下文章