手撸一个RPC
Posted Coder Club
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手撸一个RPC相关的知识,希望对你有一定的参考价值。
RPC 全名为Remote Process Call,也就是远程过程调用,与本地过程调用相对,RPC应用多为分布式场景,简单的理解是一个节点请求另一个节点提供的服务。gRPC是Google公司开源的RPC框架,腾讯后台使用的RPC框架之一是tRPC,本节我们先来了解RPC的基本工作过程,采用Java语言实现。有关tRPC的相关内容,可参看CSDN腾讯技术工程:https://blog.csdn.net/Tencent_TEG/article/details/118686900?spm=1001.2014.3001.5501
RPC工作流程
RPC的工作流程可以用下图来概括:
-
Client通知Server欲调用的函数,并将函数映射为对应的 Call ID -
Client向Server传参,与本地过程调用传参直接压栈不同,这里需要Client通过网络通信将调用参数转化为Byte流传给Server,Server再从Byte流中转化为自身能识别的格式,这其实就是序列化和反序列化的过程 -
Server处理后返回结果给Client,上述所有的C-S通信过程均采用TCP传输层和应用层完成
手撸一个RPC demo
序列化接口
首先定义一个序列化接口,
import java.io.Serializable;
/**
* 空接口,实现序列化接口
*/
public interface IRpcService extends Serializable { }
定义远程调用接口及其实现类
/**
* 远程调用的接口
*/
public interface IHelloService extends IRpcService {
String sayHi(String name, String message);
}
/**
* 实现空接口 IHelloService
*/
public class HelloServiceImpl implements IHelloService {
private static final long serialVersionUID = 146468468464164698L;
@Override
public String sayHi(String name, String message) {
return new StringBuilder().append(name).append(", ").append(message).toString();
}
}
定义Server接口
import java.io.IOException;
public interface Server {
int PORT = 8080;
void start() throws IOException;
void stop();
void register(Class<? extends IRpcService> serviceInterface, Class<? extends IRpcService> impl);
}
定义服务中心
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ServerCenter implements Server {
private static ThreadPoolExecutor executor =
new ThreadPoolExecutor(
5,
20,
200,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10));
private static final Map<String, Class<?>> serviceRegistry = new HashMap<>();
@Override
public void start() throws IOException {
ServerSocket server = new ServerSocket();
server.bind(new InetSocketAddress(PORT));
try {
while (true) {
executor.execute(new ServiceTask(server.accept()));
}
} finally {
server.close();
}
}
@Override
public void stop() {
executor.shutdown();
}
@Override
public void register(Class<? extends IRpcService> serviceInterface, Class<? extends IRpcService> impl) {
serviceRegistry.put(serviceInterface.getName(), impl);
}
private static class ServiceTask implements Runnable {
Socket client = null;
public ServiceTask(Socket client) {
this.client = client;
}
@Override
public void run() {
ObjectInputStream input = null;
ObjectOutputStream output = null;
try {
input = new ObjectInputStream(client.getInputStream());
String serviceName = input.readUTF();
String methodName = input.readUTF();
Class<?>[] param = (Class<?>[]) input.readObject();
Object[] arg = (Object[]) input.readObject();
Class<?> serviceClass = serviceRegistry.get(serviceName);
if (serviceClass == null) {
throw new ClassNotFoundException(serviceName + " not found");
}
Method method = serviceClass.getMethod(methodName, param);
Object result = method.invoke(serviceClass.newInstance(), arg);
output = new ObjectOutputStream(client.getOutputStream());
output.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (input != null) {
try {
input.close();
} catch(IOException e) {
e.printStackTrace();
}
}
if (output != null) {
try {
output.close();
} catch(IOException e) {
e.printStackTrace();
}
}
if (client != null) {
try {
client.close();
} catch(IOException e) {
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) throws Exception {
ServerCenter center = new ServerCenter();
center.register(IHelloService.class, new HelloServiceImpl().getClass());
center.start();
}
}
定义Client
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
public class Client {
@SuppressWarnings("unchecked")
public static <T extends IRpcService> T getRemoteProxyObj(final Class<? extends IRpcService> serviceInterface, final InetSocketAddress addr) {
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = null;
ObjectInputStream input = null;
ObjectOutputStream output = null;
try {
socket = new Socket();
socket.connect(addr);
output = new ObjectOutputStream(socket.getOutputStream());
output.writeUTF(serviceInterface.getName());
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(args);
input = new ObjectInputStream(socket.getInputStream());
return input.readObject();
} catch(Exception e) {
e.printStackTrace();
} finally {
if (socket != null) {
socket.close();
}
if (output != null) {
output.close();
}
if (input != null) {
input.close();
}
}
return null;
}
});
}
}
测试
import java.net.InetSocketAddress;
public class RpcTest {
public static void main(String[] args) {
IHelloService service = Client.getRemoteProxyObj(IHelloService.class, new InetSocketAddress(8080));
System.out.println(service.sayHi("Hello", "RPC"));
}
}
以上是关于手撸一个RPC的主要内容,如果未能解决你的问题,请参考以下文章