手写简易RPC
Posted 天思悦
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手写简易RPC相关的知识,希望对你有一定的参考价值。
RPC简介
RPC是远程过程调用(Remote Procedure Call)的缩写形式,是一种远程通信的方式。分布式架构主要是通过把服务分布在不同的计算机节点上面,然后通过远程通信实现数据的交换。也就是订单系统里面需要涉及到用户系统里面的远程接口,这就没有办法避免涉及到远程通信。对于内部服务之间的通信,一般选择RPC通信。
RPCDemo
建立项目
建立两个项目分别为user-service和order-service。其中user-service作为调用端也就是客户端,order-service作为被调用端,也就是服务端。在order-service中建立两个字模块(module)api和service。api负责给user-service提供接口,service负责具体实现。java毕竟是面向接口编程,我们也遵循一下规则。在api的pom文件中,不能有maven-plugin,不然打包成可供依赖的jar包时会有问题。
代码编写
1.首先呢,项目的开始我们就从一个服务端的接口开始开头,我们建立一个接口IOrderService,里面编写两个方法,分别为queryOrderList,queryOrderById。可以让客户端也就是user-service进行调用。具体代码为
public interface IOrderService {
String queryOrderList();
String queryOrderById(int id);
}
然后maven install,把包安装在本地,供其他项目进行依赖和调用
1.实现服务端逻辑
•引入api的依赖包
<dependency>
<groupId>com.shmilylyp.api</groupId>
<artifactId>api</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
•实现接口,简单实现,只为练习
public class OrderServiceImpl implements IOrderService {
@Override
public String queryOrderList() {
return "查询订单列表信息";
}
@Override
public String queryOrderById(int id) {
return "根据编号查询订单信息";
}
}
•接下来,我们就需要考虑如何把这两个方法供例外一个项目使用了。我们其实可以通过http或者rpc等这个远程通信手段来实现。现在我们通过远程通信socket进行实现。首先,我们需要把这个两个方法的类名、方法名、方法参数以及方法类型这些信息进行通信,以便我们能够更好的定位我们需要调用的是哪一个方法。我们先在api这个模块中写一个请求的封装类:
public class RpcRequest implements Serializable {
private static final long serialVersionUID = 2576338825573694638L;
private String className;
private String methodName;
private Object[] args;
private Class[] argType;
}
这里就不粘贴get,set方法来。值得一提的是我们这个RpcRequest类必须实现序列化,因为JVM之间传输数据时需要序列化的。然后,我们通过socket建立一个连接:
public class RpcProxyServer {
private final ExecutorService threadPool = Executors.newCachedThreadPool();
public void publisher(Object service, int port) {
ServerSocket serverSocket = null;
try {
// 监听端口信息
serverSocket = new ServerSocket(port);
while (true) {
Socket socket = serverSocket.accept(); // 不停的去监听请求信息
threadPool.execute(new ProcessHandle(socket, service));
}
}catch (Exception e){
e.printStackTrace();
}
}
}
这里通过一个循环,不停的去监听socket的请求。具体的处理请求过来的信息ProcessHandle代码为:
public class ProcessHandle implements Runnable{
private Socket socket;
private Object service;
public ProcessHandle(Socket socket, Object service) {
this.socket = socket;
this.service = service;
}
public void run() {
ObjectInputStream inputStream = null;
ObjectOutputStream outputStream = null;
try {
inputStream = new ObjectInputStream(socket.getInputStream());
RpcRequest request = (RpcRequest)inputStream.readObject(); // 反序列化
Object rs = invoke(request);
System.out.println("服务端处理的结果:" + rs);
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(rs);
outputStream.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 反射调用
*/
private Object invoke(RpcRequest request){
try {
Class clazz = Class.forName(request.getClassName());
// 找到目标方法
Method method = clazz.getMethod(request.getMethodName(), request.getArgType());
Object invoke = method.invoke(service, request.getArgs());
return invoke;
} catch (Exception e){
e.printStackTrace();
}
return null;
}
}
这样,我们就可以从socket客户端发过来的请求中,获取到请求的信息RpcRequest,然后通过反射调用所需要的方法来。接下来创建启动函数进行发送接口
public class App {
public static void main(String[] args) {
IOrderService iOrderService = new OrderServiceImpl();
RpcProxyServer rpcProxyServer = new RpcProxyServer();
rpcProxyServer.publisher(iOrderService, 8088);
}
}
这样我们的服务端就完成了。我们就可以启动了。启动后可以看到服务处于阻塞状态,一直在监听。
1.客户端逻辑user-serice
•实现了客户端代码,我们现在需要去考路如何远程调用它。首先第一步也是先引入依赖包,就是我们之前打包的那个api
<dependency>
<groupId>com.shmilylyp.api</groupId>
<artifactId>api</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
•引入依赖包后我们就可以调用IOrderService这个接口类了。但是呢,这不是这个项目里面的接口,也没有通过spring装载,所以没有办法直接使用。服务端通过socket进行发送,客户端当然也通过socket进行调用接口了。首先,启动类是这个样子的:
public class UserServiceApplication {
public static void main(String[] args) {
RpcProxyClient rpcProxyClient = new RpcProxyClient();
IOrderService iOrderService = rpcProxyClient.clientProxy(IOrderService.class, "localhost", 8088);
System.out.println(iOrderService.queryOrderList());
System.out.println(iOrderService.queryOrderById(1024));
}
}
这里RpcProxyClient肯定是进行请求代理的逻辑了。我们的目的是调用IOrderService里面的两个方法,那肯定就需要先把这个对象给创建出来。这里通过JDK自带的代理方法进行创建。
public class RpcProxyClient {
public <T> T clientProxy(final Class<T> interfaceClazz, final String host, final int port) {
return (T) Proxy.newProxyInstance(interfaceClazz.getClassLoader(),
new Class<?>[]{interfaceClazz}, new MyInvocationHandler(host, port));
}
}
public class MyInvocationHandler implements InvocationHandler {
private String host;
private int port;
public MyInvocationHandler(String host, int port) {
this.host = host;
this.port = port;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 建立远程连接
RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
rpcNetTransport.createSocket();
// 传递数据 传递调用的接口、类以及参数
RpcRequest request = new RpcRequest();
request.setArgs(args);
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setArgType(method.getParameterTypes());
Object send = rpcNetTransport.send(request);
return send;
}
}
通过构造函数把host和port进行传递。在invoke中进行具体逻辑处理,也就是把创建socket连接创建和RpcRequest的属性赋值。
•socket建立连接的具体实现如下:
public class RpcNetTransport {
private String host;
private int port;
public RpcNetTransport(String host, int port) {
this.host = host;
this.port = port;
}
public Socket createSocket() throws IOException {
// 创建连接
Socket socket = new Socket(host, port);
return socket;
}
public Object send(RpcRequest request) throws IOException {
ObjectInputStream inputStream = null;
ObjectOutputStream outputStream = null;
try {
Socket socket = createSocket();
// IO操作
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(request);
outputStream.flush(); // 清空缓冲区
inputStream = new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (outputStream != null) {
outputStream.close();
}
if (inputStream != null) {
inputStream.close();
}
}
return null;
}
}
这样,一个贼low的rpc就可以使用了。启动main方法,就可以看到输出结果了。
服务端输出结果:
当然,实际过程中,我们肯定不能够这么玩,这样也太麻烦了,而且不灵活。后面把它改装成注解调用的方式,就可以像springboot一样灵活了。实际也是这样的。
以上是关于手写简易RPC的主要内容,如果未能解决你的问题,请参考以下文章