rpc回调的几种设计方式

Posted littleschemer

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rpc回调的几种设计方式相关的知识,希望对你有一定的参考价值。

传统的http是基于请求-响应模式。也就是说,客户端(浏览器)主动发起请求,服务器才会给予返回值。

类似于http,RPC(RemoteProcedureCall)的远程方法调用的返回值,也是一种请求-响应模式。RPC提供类似于下面的API来获取方法的返回值。

public Response request(Message req) 
      // do sth
    return new Response();

本文主要是探讨rpc回调函数的三种设计模式

1. 基于同步阻塞的模式

第一种方式,也是最直观的模式,发起方调用API的时候,会同步阻塞当前线程直到获取响应消息。当然,为了处理各种突发事件,会设置超时参数。下面先给一个demo,然后再讨论逻辑过程。

 /**
     * 发送消息并返回执行结果
     * @param session
     * @param request
     * @return
     */
    public Message request(Session session, G2FCallBack request) throws InterruptedException, CallTimeoutException 
        int timeout = 5000;
        int index = request.getIndex();
        session.sendMessage(request);
        final RequestResponseFuture future = new RequestResponseFuture(index,  timeout,null);
        try 
            CallBackService.getInstance().register(index, future);
            Message responseMessage = future.waitResponseMessage(timeout);
            if (responseMessage == null) 
                CallTimeoutException exception = new CallTimeoutException("send request message  failed");
                future.setCause(exception);
                throw exception;
             else 
                return responseMessage;
            
         catch (InterruptedException e) 
            future.setCause(e);
            throw e;
         finally 
            CallBackService.getInstance().remove(index);
        
    

首先需要说明的一点是,由于请求发起方与函数处理方处于两个不同的进程。为了将两个消息关联在一起。需要请求方带上一个唯一id,这个id不用全局唯一,只需要在发起方内部唯一即可。

算法大概可以分为下面几个步骤:

1. 消息发送之前,生成一个唯一ID,同时注册一个回调接口。将这两者绑定缓存起来

2. 消息发送后,阻塞当前线程,等待响应结果。可以利用CountDownLatch的wait(timeout)方法。

3. 当远程进程返回结果,利用响应包的关联ID找到已注册的回调方法,执行CountDownLatch的countDown()方法,即可唤醒请求方的线程;若在指定时间内无法获得回调消息,则代表请求超时,抛出超时受检异常。

2. 基于异步操作的回调函数

在第一种模式下,以RPC的风格获得回调结果,代码比较优雅。但有一个致命的缺点,执行远程方法的时候,会阻塞当前线程。极端情况下,远程不可达,则每次调用都会阻塞到超时结束。

写过浏览器客户端逻辑的同学,应该对javascript的ajax函数很熟悉。

$.ajax(
    url:"XXXX",
    type: "GET",
    data: username:$("#username").val(),
    dataType: "json",
    complete:function(msg) 
        //请求完成后调用的回调函数(请求成功或失败时均调用)
     , 
    error:function(msg) 
        //请求失败时被调用的函数 
     , 
    Sucess:function(msg) 
        //请求成功后调用的回调函数 
     
);

注册回调函数之后,即可在获取服务端返回值的时候自动执行回调函数的逻辑。

类似地,我们的rpc回调也可以设计成ajax的模式,呈现出下面的API

    public void request(Session session, G2FCallBack request, RequestCallback callBack) 
        int timeout = 5000;
        int index = request.getIndex();
        final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(index, timeout, callBack);
        CallBackService.getInstance().register(index, requestResponseFuture);
        session.sendMessage(request);
    

请求方调用requset()方法后,不会阻塞当前线程。当远程消息返回的时候,自动执行回调函数。

    public void fillCallBack(int index, Message message) 
        RequestResponseFuture future = remove(index);
        if (future == null) 
            LoggerUtils.error("回调信息丢失 msg ", message);
            return;
        
        RequestCallback callback = future.getRequestCallback();
        if (callback != null) 
            callback.onSuccess(message);
        
        if (future != null) 
            future.putResponseMessage(message);
        
    

3. 基于Promise的回调函数

第二种模式其实已经很完美了,但还是会有一个瑕疵。熟悉javascript的同学,肯定对“回调地狱”这个名次不陌生,说的是,当javascript使用层层嵌套的回调函数后带来的设计灾难,代码惨不忍睹。于是,Promise应运而生。有了 Promise 对象,就可以将异步操作以同步操作的流程表达出来,避免了层层嵌套的回调函数。

const fs=require('fs')

function getPromise(filename)
    return new Promise((resolve,reject)=>
    //异步操作读文件
    fs.readFile(`//demo.txt`,'utf-8',(err,data)=>
        if(err)
            //读取失败
            reject(err)  
        else
            //读取成功
            resolve(data)
        
    )
)


getPromise('a').then((data)=>
    console.log(data)
    return getPromise('b')
).then((data)=>
    console.log(data)
    return getPromise('c')
).then((data)=>
    console.log(data)

Java的线程池虽然提供了异步执行任务的接口,但遗憾的是该方法返回的Future对象,如果想获取执行结果,要么通过轮训检查结果,要么简单粗暴通过调用get()方法阻塞后获得结果。

<T> Future<T> submit(Callable<T> task);

基于此缺点。guava第三方库提供了ListenableFuture接口,Netty提供了PromiseTask接口,本质都是通过监听器避免阻塞调用。其实JDK的CompletableFuture也能实现目的。

public static CompletableFuture<RpcCallbackResponse> sendToCenter(RpcCallbackRequest message) 
        RpcClientRouter clientRouter = GameContext.getBean(RpcClientRouter.class);
        IdSession session = clientRouter.getCenterSession();
      
        int index = idFactory.getAndIncrement();
        message.setCallbackId(index);

        CompletableFuture future = new CompletableFuture();
        Callback callback = new Callback();
        callback.setFuture(future);
        CallbackHandler.registerCallback(index, callback);

        ScheduledFuture timeout = GameContext.getSchedulerManager().schedule(() -> 
            LoggerUtils.error("跨服消息[]回调超时", message.getClass().getSimpleName());
            CallbackHandler.removeCallback(index);
            callback.getFuture().completeExceptionally(new RpcTimeoutException());
        , Callback.TIME_OUT);
        callback.setTimeout(timeout);

        session.sendPacket(message);

        return future;
    

客户端代码,非常优雅 ^ - ^

RpcReqHello req = new RpcReqHello();
req.setContent("hello, game");

CrossMessageUtil.sendToCenter(req).thenAccept(m -> 
    System.out.println(m);
);

总结下三种模式的优缺点

优点缺点
同步阻塞客户端代码调用优雅会阻塞当前线程
异步回调异步不阻塞当前线程嵌套回调代码丑陋
promise初学者不好理解嵌套回调代码优雅

以上是关于rpc回调的几种设计方式的主要内容,如果未能解决你的问题,请参考以下文章

rpc回调的几种设计方式

rpc回调的几种设计方式

探讨一下实现幂等性的几种方式

探讨确保消息消费幂等性的几种方式

对外提供服务的几种方式

远程通信的几种选择