rpc回调的几种设计方式
Posted littleschemer
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rpc回调的几种设计方式相关的知识,希望对你有一定的参考价值。
传统的http是基于请求-响应模式。也就是说,客户端(浏览器)主动发起请求,服务器才会给予返回值。
类似于http,RPC(RemoteProcedureCall)的远程方法调用的返回值,也是一种请求-响应模式。RPC提供类似于下面的API来获取方法的返回值。
public Response request(Message req) {
// do sth
return new Response();
}
本文主要是探讨rpc回调函数的三种设计模式
1. 基于同步阻塞的模式
第一种方式,也是最直观的模式,发起方调用API的时候,会同步阻塞当前线程直到获取响应消息。当然,为了处理各种突发事件,会设置超时参数。下面先给一个demo,然后再讨论逻辑过程。
/**
* 发送消息并返回执行结果
* @param session
* @param request
* @return
*/
public Message request(Session session, G2FCallBack request) throws InterruptedException, CallTimeoutException {
int timeout = 5000;
int index = request.getIndex();
session.sendMessage(request);
final RequestResponseFuture future = new RequestResponseFuture(index, timeout,null);
try {
CallBackService.getInstance().register(index, future);
Message responseMessage = future.waitResponseMessage(timeout);
if (responseMessage == null) {
CallTimeoutException exception = new CallTimeoutException("send request message failed");
future.setCause(exception);
throw exception;
} else {
return responseMessage;
}
} catch (InterruptedException e) {
future.setCause(e);
throw e;
} finally {
CallBackService.getInstance().remove(index);
}
}
首先需要说明的一点是,由于请求发起方与函数处理方处于两个不同的进程。为了将两个消息关联在一起。需要请求方带上一个唯一id,这个id不用全局唯一,只需要在发起方内部唯一即可。
算法大概可以分为下面几个步骤:
1. 消息发送之前,生成一个唯一ID,同时注册一个回调接口。将这两者绑定缓存起来
2. 消息发送后,阻塞当前线程,等待响应结果。可以利用CountDownLatch的wait(timeout)方法。
3. 当远程进程返回结果,利用响应包的关联ID找到已注册的回调方法,执行CountDownLatch的countDown()方法,即可唤醒请求方的线程;若在指定时间内无法获得回调消息,则代表请求超时,抛出超时受检异常。
2. 基于异步操作的回调函数
在第一种模式下,以RPC的风格获得回调结果,代码比较优雅。但有一个致命的缺点,执行远程方法的时候,会阻塞当前线程。极端情况下,远程不可达,则每次调用都会阻塞到超时结束。
写过浏览器客户端逻辑的同学,应该对javascript的ajax函数很熟悉。
$.ajax({
url:"XXXX",
type: "GET",
data: {username:$("#username").val()},
dataType: "json",
complete:function(msg){
//请求完成后调用的回调函数(请求成功或失败时均调用)
} ,
error:function(msg){
//请求失败时被调用的函数
} ,
Sucess:function(msg){
//请求成功后调用的回调函数
}
});
注册回调函数之后,即可在获取服务端返回值的时候自动执行回调函数的逻辑。
类似地,我们的rpc回调也可以设计成ajax的模式,呈现出下面的API
public void request(Session session, G2FCallBack request, RequestCallback callBack) {
int timeout = 5000;
int index = request.getIndex();
final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(index, timeout, callBack);
CallBackService.getInstance().register(index, requestResponseFuture);
session.sendMessage(request);
}
请求方调用requset()方法后,不会阻塞当前线程。当远程消息返回的时候,自动执行回调函数。
public void fillCallBack(int index, Message message) {
RequestResponseFuture future = remove(index);
if (future == null) {
LoggerUtils.error("回调信息丢失 msg {}", message);
return;
}
RequestCallback callback = future.getRequestCallback();
if (callback != null) {
callback.onSuccess(message);
}
if (future != null) {
future.putResponseMessage(message);
}
}
3. 基于Promise的回调函数
第二种模式其实已经很完美了,但还是会有一个瑕疵。熟悉javascript的同学,肯定对“回调地狱”这个名次不陌生,说的是,当javascript使用层层嵌套的回调函数后带来的设计灾难,代码惨不忍睹。于是,Promise应运而生。有了 Promise 对象,就可以将异步操作以同步操作的流程表达出来,避免了层层嵌套的回调函数。
const fs=require('fs')
function getPromise(filename){
return new Promise((resolve,reject)=>{
//异步操作读文件
fs.readFile(`//demo.txt`,'utf-8',(err,data)=>{
if(err){
//读取失败
reject(err)
}else{
//读取成功
resolve(data)
}
})
})
}
getPromise('a').then((data)=>{
console.log(data)
return getPromise('b')
}).then((data)=>{
console.log(data)
return getPromise('c')
}).then((data)=>{
console.log(data)
}
Java的线程池虽然提供了异步执行任务的接口,但遗憾的是该方法返回的Future对象,如果想获取执行结果,要么通过轮训检查结果,要么简单粗暴通过调用get()方法阻塞后获得结果。
<T> Future<T> submit(Callable<T> task);
基于此缺点。guava第三方库提供了ListenableFuture接口,Netty提供了PromiseTask接口,本质都是通过监听器避免阻塞调用。其实JDK的CompletableFuture也能实现目的。
public static CompletableFuture<RpcCallbackResponse> sendToCenter(RpcCallbackRequest message) {
RpcClientRouter clientRouter = GameContext.getBean(RpcClientRouter.class);
IdSession session = clientRouter.getCenterSession();
int index = idFactory.getAndIncrement();
message.setCallbackId(index);
CompletableFuture future = new CompletableFuture();
Callback callback = new Callback();
callback.setFuture(future);
CallbackHandler.registerCallback(index, callback);
ScheduledFuture timeout = GameContext.getSchedulerManager().schedule(() -> {
LoggerUtils.error("跨服消息[{}]回调超时", message.getClass().getSimpleName());
CallbackHandler.removeCallback(index);
callback.getFuture().completeExceptionally(new RpcTimeoutException());
}, Callback.TIME_OUT);
callback.setTimeout(timeout);
session.sendPacket(message);
return future;
}
客户端代码,非常优雅 ^ - ^
RpcReqHello req = new RpcReqHello();
req.setContent("hello, game");
CrossMessageUtil.sendToCenter(req).thenAccept(m -> {
System.out.println(m);
});
总结下三种模式的优缺点
优点 | 缺点 | |
同步阻塞 | 客户端代码调用优雅 | 会阻塞当前线程 |
异步回调 | 异步不阻塞当前线程 | 嵌套回调代码丑陋 |
promise | 初学者不好理解 | 嵌套回调代码优雅 |
以上是关于rpc回调的几种设计方式的主要内容,如果未能解决你的问题,请参考以下文章