10个类手写实现 RPC 通信框架原理
Posted Java知音
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了10个类手写实现 RPC 通信框架原理相关的知识,希望对你有一定的参考价值。
autumn200.com/2020/06/21/write-rpc/
什么是rpc
RPC:remote procedure call Protocol 远程过程调用 调用远程服务,就像调用本地的服务一样,不用关心调用细节,就像调用本机的服务一样的
RPC原理
实现RPC通信的程序包括5个部分:rpc-client、客户端proxy、socket、服务端proxy、rpc-server
request
-
客户端:当rpc-client发起远程调用时,实际上是通过客户端代理 将要调用的接口、方法、参数、参数类型进行序列化,然后通过socket实时将封装调用参数的实例发送到服务端。 -
服务端:socket接收到客户端传来的信息进行反序列化,然后通过这些信息委派到具体的实现对象
response
-
服务端:目标方法执行完成后,将执行结果返回给socket -
客户端:socket接收到结果后,返回给rpc-client,调用结束
应用到的技术
-
java -
spring -
序列化 -
socket -
反射 -
动态代理
项目GitHub地址
https://github.com/autumnqfeng/write_rpc
服务端项目
项目结构
rpc-server项目包含2个子项目:order-api、order-provider
-
order-api中存放请求接口与RpcRequest(类名、方法名、参数的实体类)
-
order-provider为请求接口实现、socket、proxy相关类
order-api
order-provider
服务注册
要想实现动态调用ServiceImpl,关键就需要将service类管理起来,那问题来了,我们如何管理这些服务类呢?
我们可以参照spring中的@Service注解,通过自定义注解的方式来做到服务注册,我们定义一个注解@RpcRemoteService作用在ServiceImpl类上,将标记此注解的类名、方法名保存到Map中,以此来定位到具体实现类。
@RpcRemoteService注解
/**
* 服务端服务发现注解
*
* @author: ***
* @date: 2020/6/21 16:21
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcRemoteService {
}
服务注册类InitialMerdiator
在spring容器初始化完成之后,扫描到@RpcRemoteService标记的类,并保存到Mediator.ROUTING中。
/**
* 初始化中间代理层对象
*
* @author: ***
* @date: 2020/6/21 16:33
*/
@Component
public class InitialMerdiator implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//加了服务发布标记的bean进行远程发布
if (bean.getClass().isAnnotationPresent(RpcRemoteService.class)) {
Method[] methods = bean.getClass().getDeclaredMethods();
for (Method method : methods) {
String routingKey = bean.getClass().getInterfaces()[0].getName() + "." + method.getName();
BeanMethod beanMethod = new BeanMethod();
beanMethod.setBean(bean);
beanMethod.setMethod(method);
Mediator.ROUTING.put(routingKey, beanMethod);
}
}
return bean;
}
}
socket监听
socket监听客户端请求
socket启动类SocketServer
spring容器加载完成之后,启动socket
/**
* spring 容器启动完成之后,会发布一个ContextRefreshedEven
* 容器启动后启动socket监听
*
* @author: ***
* @date: 2020/6/21 16:51
*/
@Component
public class SocketServer implements ApplicationListener<ContextRefreshedEvent> {
private final ExecutorService executorService= Executors.newCachedThreadPool();
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
ServerSocket serverSocket=null;
try {
serverSocket = new ServerSocket(8888);
while (true) {
Socket accept = serverSocket.accept();
// 线程池处理socket
executorService.execute(new ProcessorHandler(accept));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocket != null) {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
socket处理类ProcessorHandler
处理监听到的每个socket
public class ProcessorHandler implements Runnable {
private Socket socket;
public ProcessorHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
ObjectInputStream inputStream = null;
ObjectOutputStream outputStream = null;
try {
inputStream = new ObjectInputStream(socket.getInputStream());
// 反序列化
RpcRequest rpcRequest = (RpcRequest) inputStream.readObject();
// 中间代理执行目标方法
Mediator mediator = Mediator.getInstance();
Object response = mediator.processor(rpcRequest);
System.out.println("服务端的执行结果:"+response);
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(response);
outputStream.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
closeStream(inputStream, outputStream);
}
}
private void closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream) {
// 关闭流
if(inputStream!=null){
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (outputStream!=null){
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
服务端代理
Mediator
/**
* 服务端socket与目标方法的中间代理层
*
* @author: ***
* @date: 2020/6/21 16:25
*/
public class Mediator {
/** 用来存储发布的服务的实例(服务调用的路由) */
public static Map<String, BeanMethod> ROUTING = new ConcurrentHashMap<>();
/** 单例模式创建该代理层实例 */
private volatile static Mediator instance;
private Mediator() {
}
public static Mediator getInstance() {
if (instance == null) {
synchronized (Mediator.class) {
if (instance == null) {
instance = new Mediator();
}
}
}
return instance;
}
public Object processor(RpcRequest rpcRequest) {
// 路由key
String routingKey = rpcRequest.getClassName() + "." + rpcRequest.getMethodName();
BeanMethod beanMethod = ROUTING.get(routingKey);
if (beanMethod == null) {
return null;
}
// 执行目标方法
Object bean = beanMethod.getBean();
Method method = beanMethod.getMethod();
try {
return method.invoke(bean, rpcRequest.getArgs());
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
BeanMethod
/**
* 中间层反射调用时,存储目标方法、目标类的实体
*
* @author: ***
* @date: 2020/6/21 16:43
*/
public class BeanMethod {
private Object bean;
private Method method;
// setter、getter略
}
客户端项目
项目结构
服务发现
服务发现我们同样使用注解来做,这就需要参照Spring中@Autowired的原理实现,我们自定义@RpcReference注解,定义在字段上,将接口实现的代理类注入到该字段上。
@RpcReference注解
/**
* 服务注入注解
*
* @author: ***
* @date: 2020/6/20 22:41
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcReference {
}
服务发现类ReferenceInvokeProxy
在spring容器初始化之前,扫描bean中所有@RpcReference注解标记的字段。
/**
* 远程动态调用service代理
*
* @author: ***
* @date: 2020/6/20 22:44
*/
@Component
public class ReferenceInvokeProxy implements BeanPostProcessor {
@Autowired
private RemoteInvocationHandler invocationHandler;
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
Field[] fields = bean.getClass().getDeclaredFields();
for (Field field : fields) {
if (field.isAnnotationPresent(RpcReference.class)) {
field.setAccessible(true);
// 针对这个加了RpcReference注解的字段,设置为一个代理的值
Object proxy = Proxy.newProxyInstance(field.getType().getClassLoader(), new Class<?>[]{field.getType()}, invocationHandler);
try {
// 相当于针对加了RpcReference的注解,设置了一个代理,这个代理的实现是invocationHandler
field.set(bean, proxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
return bean;
}
}
客户端代理
客户端动态代理InvocationHandler实现类RemoteInvocationHandler
将目标方法名、目标类名、参数信息封装到RpcRequest,然后交给socket发送到服务端。
/**
* @author: ***
* @date: 2020/6/20 22:51
*/
@Component
public class RemoteInvocationHandler implements InvocationHandler {
@Autowired
private RpcNetTransport rpcNetTransport;
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setTypes(method.getParameterTypes());
rpcRequest.setArgs(args);
return rpcNetTransport.send(rpcRequest);
}
}
客户端socket
网络传输RpcNetTransport
/**
* rpc socket 网络传输
*
* @author: ***
* @date: 2020/6/20 22:59
*/
@Component
public class RpcNetTransport {
@Value("${rpc.host}")
private String host;
@Value("${rpc.port}")
private int port;
public Object send(RpcRequest rpcRequest) {
ObjectOutputStream outputStream = null;
ObjectInputStream inputStream = null;
try {
Socket socket = new Socket(host, port);
// 发送目标方法信息
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(rpcRequest);
outputStream.flush();
// 接收返回值
inputStream = new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
} finally {
closeStream(inputStream, outputStream);
}
return null;
}
private void closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream) {
// 关闭流
if(inputStream!=null){
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (outputStream!=null){
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
另
END
我知道你 “在看”
以上是关于10个类手写实现 RPC 通信框架原理的主要内容,如果未能解决你的问题,请参考以下文章