Dubbo消费端同步调用异步调用(基于Dubbo3)
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo消费端同步调用异步调用(基于Dubbo3)相关的知识,希望对你有一定的参考价值。
上一节,我们大概浏览了一下Dubbo消费端的启动流程Dubbo消费端启动流程、处理逻辑,方法调用实现(基于Dubbo3)
我们知道Dubbo底层的网络通信是基于Netty的,而Netty一般是基于异步IO,那我们如果需要同步调用、异步调用,Dubbo中是怎么实现的呢?
我们回到DubboInvoker.doInvoke
中,这里是关键:
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = calculateTimeout(invocation, methodName);
invocation.put(TIMEOUT_KEY, timeout);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
}
}
这里isOneway
来识别调用方法是否有返回值,如果没有返回值的,那么默认就是异步调用
,在没有返回值异步调用情况下,还会判断DubboReference -> sent
参数,默认是false,如果sent是true的话,那么会发送消息之后等待消息发送成功之后再返回,如果在等待时间内未发送成功,则返回失败:
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.writeAndFlush(message);
if (sent) {
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
removeChannelIfDisconnected(channel);
throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
这种不需要返回值,Dubo直接异步调用,同时返回了一个AsyncRpcResult
,其值为null。
对于我们在程序里指定同步还是异步调用,在DubboReference -> async (默认为false)
指定是同步还是异步调用,对于同步调用,Dubbo中封装了一个
protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class)
.getDefaultExtension()
.getExecutor(url);
if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
return new ThreadlessExecutor(sharedExecutor);
} else {
return sharedExecutor;
}
}
如果是同步调用,这里返回的是一个封装的ThreadlessExecutor
类,然后执行request
,最终在HeaderExchangeChannel
实现:
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
返回了一个DefaultFuture
,然后通过FutureContext.getContext().setCompatibleFuture(appResponseFuture);
和当前线程绑定。封装到AsyncRpcResult
返回,而在调用doInvoker
的inovke
方法中,会根据请求是同步还是异步进行判断等待:
public Result invoke(Invocation inv) throws RpcException {
RpcInvocation invocation = (RpcInvocation) inv;
prepareInvocation(invocation);
AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);
waitForResultIfSync(asyncResult, invocation);
return asyncResult;
}
private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) {
if (InvokeMode.SYNC != invocation.getInvokeMode()) {
return;
}
try {
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RpcException("Interrupted unexpectedly while waiting for remote result to return! method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (ExecutionException e) {
Throwable rootCause = e.getCause();
if (rootCause instanceof TimeoutException) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} else if (rootCause instanceof RemotingException) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} else {
throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " +
invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
} catch (Throwable e) {
throw new RpcException(e.getMessage(), e);
}
}
如果不是同步调用,这里不会等待,而同步调用则会在这里等待,
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
如果没有返回值的话,这里会一直等。
那么Dubbo是怎么知道请求有返回的呢,答案还是在之前的DefaultFuture
中。
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
实际上Dubbo中每次请求都会有一个唯一的ID,请求和相应都会带着这个ID。在生成DefaultFuture
会将这个DefaultFuture通过id放到一个map中。
而当有响应返回的时候,则会从
public static void received(Channel channel, Response response, boolean timeout) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
t.cancel();
}
future.doReceived(response);
} else {
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
this.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}
// the result is returning, but the caller thread may still waiting
// to avoid endless waiting for whatever reason, notify caller thread to return.
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
if (threadlessExecutor.isWaiting()) {
threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
" which is not an expected state, interrupt the thread manually by returning an exception."));
}
}
}
这样,当有响应返回的时候就能够获取到对应的结果。
而如果是异步调用,返回的也是一个AsyncRpcResult
,都是通过getValue
获取返回值,但是对于异步调用,返回的只是一个空的:
// 同步模式下,有返回值
String result = service.sayHello();
// 异步模式下,这个返回值是空的
String result = service.sayHello();
// 异步模式下需要通过这种方式来获取返回值
RpcContext.getContext().getFuture().get()
以上是关于Dubbo消费端同步调用异步调用(基于Dubbo3)的主要内容,如果未能解决你的问题,请参考以下文章