自己动手写0.5个RPC框架
Posted K字的研究
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自己动手写0.5个RPC框架相关的知识,希望对你有一定的参考价值。
大家好, 我是jkl.
众所周知, 写一个rpc框架非常简单. 网上有无数的培训班号称可以教人手写. 甚至, 曾经有人宣称,只需要百十行代码. 而且,宣称这点的大佬, 曾经写过一个小大有名气的框架叫dubbo, 更是增加了这话的可信度.
今天, 也来凑凑热闹. 试试看, 我能不能能写出来. 还是说,这是一个小马过河的问题, 他写两百是因为他会写,我们不行.今天也来做个试验,挑战一下,是不是和大佬一样,只用标准库就可以写出来.
首先, 先分析下结构. 一个最简单的RPC框架, 理论上要包括4部分.
-
导出provider的功能 -
链接consumer的功能 -
监听请求 -
执行请求的功能
先从最简单的部分开始.
首先, 准备测试用的辅助.
-
一个接口(DemoService.java) -
一个实现(DemoServiceImpl.java)
public interface DemoService {
public Integer square(Integer x);
}
public class DemoServiceImpl implements DemoService {
public Integer square(Integer x) {
return x * x;
}
}
接下来, 暴露一个服务
当我们收到一个请求, 要调用一个叫DemoService
的接口, 接下来框架会去调用实现类. 说明, 接口或者接口名是和类实现绑定的. 绑定操作,最简单的方式, 就是Map
.
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class RPC {
static Map<String, Object> map = new ConcurrentHashMap<>();
public static <T extends S, S> void export(Class<S> s, T impl) {
map.put(s.getName(), impl);
}
}
好了, 暴露一个服务的方法算是设计完了.
再来, 是执行请求.
网络先放放,网络苦手,最难的放最后.
先假设, 已经收到了请求, 要求执行DemoService
的square
方法,参数是Integer
类型:3
.
那么现在就有了
-
className 类名 -
methodName 方法名 -
parameterType 参数类型 -
parameter 参数
看上去, 这个特别像反射的场景. 所以, 结果就出来了.
private static Object call(String clazz,String method,Class[]types,
Object[]param)throws Exception{
Object impl=map.get(clazz);
return impl.getClass().getMethod(method,types)
.invoke(impl,param);
}
接下来改写消费者端了
因为注册表(registry)属于高级功能, 这次这里不写,只用直连. 所以, 使用时候要用到
-
目标ip, -
端口, -
接口类.
只有一个接口, 却能够调用. 这里要用到,java的代理技术.
public class RPC {
public static <S> S refer(Class<S> s, String host, Integer port) throws IOException {
InvocationHandler invocationHandler = (proxy, method, args) -> null;
Object p = Proxy.newProxyInstance(s.getClassLoader(), new Class[]{s}, invocationHandler);
return (S) p;
}
}
发送请求
这里就直接用java基础教程里面的socket请求. 改造下, 搭配上ObjectStream的读写.
-
要先写一个字符串 DemoService.square
-
然后再写一个 Class[]
-
再写一个 Object[]
这么混合起来.就得到了调用方法. 上面invocationHandler
的实现部分.
InvocationHandler invocationHandler=(proxy,method,args)->{
Socket socket=new Socket(host,port);
try(InputStream is=socket.getInputStream();
OutputStream os=socket.getOutputStream();
ObjectOutputStream oos=new ObjectOutputStream(os);
ObjectInputStream ois=new ObjectInputStream(is)){
oos.writeUTF(s.getName()+"."+method.getName());
oos.writeObject(method.getParameterTypes());
oos.writeObject(args);
oos.flush();
return ois.readObject();
}
};
暴露方法有了, 获取代理的消费也有了.执行也有了. 还剩下网络监听server没写.
网络监听
这里还是继续抄写java官网的socket server
直接用最后一个例子, 多客户端模式
while(true){
accept a connection;
create a thread to deal with the client;
}
写好之后, 大概是这个样子.
import java.io.*;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class RPC {
static Map<String, Object> map = new ConcurrentHashMap<>();
public static <T extends S, S> void export(Class<S> s, T impl) {
map.put(s.getName(), impl);
}
private static Object call(String clazz, String method, Class[] types, Object[] param) throws Exception {
Object impl = map.get(clazz);
return impl.getClass().getMethod(method, types)
.invoke(impl, param);
}
public static <S> S refer(Class<S> s, String host, Integer port) throws IOException {
InvocationHandler invocationHandler = (proxy, method, args) -> {
Socket socket = new Socket(host, port);
try (InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(os);
ObjectInputStream ois = new ObjectInputStream(is)) {
oos.writeUTF(s.getName() + "." + method.getName());
oos.writeObject(method.getParameterTypes());
oos.writeObject(args);
oos.flush();
return ois.readObject();
}
};
return (S) Proxy.newProxyInstance(s.getClassLoader(), new Class[]{s},
invocationHandler);
}
public static void bind(Integer port) throws IOException {
final ServerSocket serverSocket = new ServerSocket(port);
while (true) {
final Socket socket = serverSocket.accept();
new Thread(() -> {
try (ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream())) {
final String req = ois.readUTF();
final int dot = req.lastIndexOf(".");
final String clazz = req.substring(0, dot);
final String method = req.substring(dot + 1);
final Class[] types = (Class[]) ois.readObject();
final Object[] params = (Object[]) ois.readObject();
final Object result = call(clazz, method, types, params);
oos.writeObject(result);
oos.flush();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
验证
写两个main方法来试用一下.
暴露
public static void main(String[]args)throws IOException{
RPC.export(DemoService.class,new DemoServiceImpl());
RPC.bind(3333);
}
消费
public static void main(String[]args)throws IOException{
final DemoService service=RPC.refer(DemoService.class,"localhost",3333);
for(int i=0;i< 100;i++){
System.out.println("service.square(i) = "+service.square(i));
}
}
结果
service.square(i) = 8649
service.square(i) = 8836
service.square(i) = 9025
service.square(i) = 9216
service.square(i) = 9409
service.square(i) = 9604
service.square(i) = 9801
总结
好容易写完, 没超过100行. 好吧, 我对了下答案, 相比梁老的写法, 我漏了很多校验. 还有异常捕获. 为了在微信排版还用了很多短变量. 其实问题很大,不过总算也是成功了. 还是总结下.
这玩意目前缺点很明显, 每一次调用开一次socket,首先就是不能接受的. dubbo中, 这部分传输, 用的是netty重写的.
这里可以对照一下dubbo的广告, 俗称6大功能.
-
面向接口代理的高性能RPC调用, -
智能容错和负载均衡, -
服务自动注册和发现, -
高度可扩展能力, -
运行期流量调度, -
可视化的服务治理与运维。
这里其实只写了第一个功能的一半, 基于接口的RPC调用. 性能是一点都不高.后面的功能全都没有. dubbo是多么强大, 可以想见.
不过呢, 本次虽然是个玩具,用到的基础知识还是挺多的.拉个清单吧.
-
泛型 -
接口与继承 -
反射 -
代理 -
socket编程 -
IO流操作 -
对象序列化 -
自定义协议 (把类名和方法名拼到一起也算是协议)
回头再好好聊聊Dubbo, 今天时间不够了.
References
[1]
梁老的例子: https://blog.csdn.net/u010805542/article/details/83970611[2]
socket请求: https://docs.oracle.com/javase/tutorial/networking/sockets/readingWriting.html[3]
socket server: https://docs.oracle.com/javase/tutorial/networking/sockets/clientServer.html
以上是关于自己动手写0.5个RPC框架的主要内容,如果未能解决你的问题,请参考以下文章
自己动手写RPC框架到dubbo的服务动态注册,服务路由,负载均衡功能实现
自己动手写RPC框架有那么难吗?这次我设计了一款TPS百万级别的分布式高性能可扩展的RPC框架(赶快收藏)