Dubbo消费端同步调用异步调用(基于Dubbo3)

Posted Leo Han

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo消费端同步调用异步调用(基于Dubbo3)相关的知识,希望对你有一定的参考价值。

上一节,我们大概浏览了一下Dubbo消费端的启动流程Dubbo消费端启动流程、处理逻辑,方法调用实现(基于Dubbo3)
我们知道Dubbo底层的网络通信是基于Netty的,而Netty一般是基于异步IO,那我们如果需要同步调用、异步调用,Dubbo中是怎么实现的呢?
我们回到DubboInvoker.doInvoke中,这里是关键:

protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = calculateTimeout(invocation, methodName);
            invocation.put(TIMEOUT_KEY, timeout);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException e) {
          
        }
    }

这里isOneway来识别调用方法是否有返回值,如果没有返回值的,那么默认就是异步调用,在没有返回值异步调用情况下,还会判断DubboReference -> sent参数,默认是false,如果sent是true的话,那么会发送消息之后等待消息发送成功之后再返回,如果在等待时间内未发送成功,则返回失败:

public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);
        boolean success = true;
        int timeout = 0;
        try {
            ChannelFuture future = channel.writeAndFlush(message);
            if (sent) {
                timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.cause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            removeChannelIfDisconnected(channel);
            throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }
        if (!success) {
            throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }

这种不需要返回值,Dubo直接异步调用,同时返回了一个AsyncRpcResult,其值为null。

对于我们在程序里指定同步还是异步调用,在DubboReference -> async (默认为false)指定是同步还是异步调用,对于同步调用,Dubbo中封装了一个

 protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
        ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class)
                .getDefaultExtension()
                .getExecutor(url);
        if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
            return new ThreadlessExecutor(sharedExecutor);
        } else {
            return sharedExecutor;
        }
    }

如果是同步调用,这里返回的是一个封装的ThreadlessExecutor类,然后执行request,最终在HeaderExchangeChannel实现:

public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

返回了一个DefaultFuture,然后通过FutureContext.getContext().setCompatibleFuture(appResponseFuture);和当前线程绑定。封装到AsyncRpcResult返回,而在调用doInvokerinovke方法中,会根据请求是同步还是异步进行判断等待:

public Result invoke(Invocation inv) throws RpcException {
        RpcInvocation invocation = (RpcInvocation) inv;
        prepareInvocation(invocation);
        AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);
        waitForResultIfSync(asyncResult, invocation);
        return asyncResult;
    }
private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) {
        if (InvokeMode.SYNC != invocation.getInvokeMode()) {
            return;
        }
        try {
            asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            throw new RpcException("Interrupted unexpectedly while waiting for remote result to return! method: " +
                    invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (ExecutionException e) {
            Throwable rootCause = e.getCause();
            if (rootCause instanceof TimeoutException) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +
                        invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } else if (rootCause instanceof RemotingException) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +
                        invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } else {
                throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " +
                        invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        } catch (Throwable e) {
            throw new RpcException(e.getMessage(), e);
        }
    }

如果不是同步调用,这里不会等待,而同步调用则会在这里等待,
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);如果没有返回值的话,这里会一直等。

那么Dubbo是怎么知道请求有返回的呢,答案还是在之前的DefaultFuture中。

private DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
        // put into waiting map.
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }

实际上Dubbo中每次请求都会有一个唯一的ID,请求和相应都会带着这个ID。在生成DefaultFuture会将这个DefaultFuture通过id放到一个map中。
而当有响应返回的时候,则会从

public static void received(Channel channel, Response response, boolean timeout) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                Timeout t = future.timeoutCheckTask;
                if (!timeout) {
                    t.cancel();
                }
                future.doReceived(response);
            } else {
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }
private void doReceived(Response res) {
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        if (res.getStatus() == Response.OK) {
            this.complete(res.getResult());
        } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
        } else {
            this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
        }

        // the result is returning, but the caller thread may still waiting
        // to avoid endless waiting for whatever reason, notify caller thread to return.
        if (executor != null && executor instanceof ThreadlessExecutor) {
            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
            if (threadlessExecutor.isWaiting()) {
                threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
                        " which is not an expected state, interrupt the thread manually by returning an exception."));
            }
        }
    }

这样,当有响应返回的时候就能够获取到对应的结果。

而如果是异步调用,返回的也是一个AsyncRpcResult,都是通过getValue获取返回值,但是对于异步调用,返回的只是一个空的:

// 同步模式下,有返回值
String result = service.sayHello();

// 异步模式下,这个返回值是空的
String result = service.sayHello();
// 异步模式下需要通过这种方式来获取返回值
RpcContext.getContext().getFuture().get()

以上是关于Dubbo消费端同步调用异步调用(基于Dubbo3)的主要内容,如果未能解决你的问题,请参考以下文章

Dubbo3高级特性「框架与服务」框架与服务的异步调用实践以及开发模式

服务消费端泛化调用与异步调用

小白也能看懂的dubbo3应用级服务发现详解

Dubbo服务端消费端网络连接数控制

Dubbo服务提供端与消费端应用的搭建

Dubbo3高级特性「框架与服务」Dubbo3客户端和服务端的泛化调用机制体系