手写RPC远程调用

Posted 程序猿Fengpt

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手写RPC远程调用相关的知识,希望对你有一定的参考价值。

远程调用原理

比如 A (client) 调用 B (server) 提供的方法:

  • 首先A与B之间建立一个TCP连接;

  • 然后A把需要调用的方法名以及方法参数序列化成字节流发送出去;

  • B接受A发送过来的字节流,然后反序列化得到目标方法名,方法参数,接着执行相应的方法调用并把结果返回;

  • A接受远程调用结果,输出。

  • RPC框架就是把我刚才说的这几点些细节给封装起来,给用户暴露简单友好的API使用。

这里我们采用jdk的动态代理实现

话不多说,直接看代码。

本案例分为三个模块rpc-api、rpc-cusmer、rpc-product。

一、首先暴露出来一个接口(rpc-api)

1、提供一个接口

  
    
    
  

package com.rpc.api;



public interface Api {

public String testApi(String name);

}
2、封装一个request的data

  
    
    
  

package com.rpc.data;



import java.io.Serializable;



public class RequestData implements Serializable{



/**

*

*/

private static final long serialVersionUID = 1L;

private String interfaceName;

private String methodName;

private Class<?>[]parameterTypes;

private Object[] parameter;

public String getInterfaceName() {

return interfaceName;

}

public void setInterfaceName(String interfaceName) {

this.interfaceName = interfaceName;

}

public String getMethodName() {

return methodName;

}

public void setMethodName(String methodName) {

this.methodName = methodName;

}

public Class<?>[] getParameterTypes() {

return parameterTypes;

}

public void setParameterTypes(Class<?>[] parameterTypes) {

this.parameterTypes = parameterTypes;

}

public Object[] getParameter() {

return parameter;

}

public void setParameter(Object[] parameter) {

this.parameter = parameter;

}



}

二、服务的提供者(rpc-product)

1、pom依赖rpc-api
2、实现接口

  
    
    
  

package com.rpc.service;



import com.rpc.api.Api;



public class ApiImp implements Api {



public String testApi(String name) {

// TODO Auto-generated method stub

return "Hello "+name;

}



}
3、提供一个服务

  
    
    
  
package com.rpc.service;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.rpc.data.RequestData;

public class RpcServer {
//zookeper
private Map<String,Object> serviceMap=new ConcurrentHashMap<String, Object>(32);

private ThreadPoolExecutor excExecutor=new ThreadPoolExecutor(8,20, 200,TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));

public void publishService(Class<?>interfaceClass,Object instance){
this.serviceMap.put(interfaceClass.getName(), instance);
}
public void start(int port){
try {
ServerSocket serverSocket=new ServerSocket();
serverSocket.bind(new InetSocketAddress(port));
System.out.println("Ѿ.....");
while(true){
excExecutor.execute(new Tack(serverSocket.accept()));
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private class Tack implements Runnable{
private Socket client;
public Tack(Socket socket){
this.client=socket;
}
public void run() {
try{
ObjectOutputStream serializer=new ObjectOutputStream(client.getOutputStream());
ObjectInputStream deserializer=new ObjectInputStream(client.getInputStream());
RequestData requestData=(RequestData) deserializer.readObject();
Object instance=serviceMap.get(requestData.getInterfaceName());
Method method= instance.getClass().getDeclaredMethod(requestData.getMethodName(),requestData.getParameterTypes());
Object result=method.invoke(instance, requestData.getParameter());
serializer.writeObject(result);
}catch(Exception e){

}

}

}


}
4、对外发布接口

  
    
    
  

package com.rpc.service;



import com.rpc.api.Api;



public class ServerMain {



public static void main(String[] args) {

RpcServer rpcServer=new RpcServer();

rpcServer.publishService(Api.class, new ApiImp());

rpcServer.start(12345);

}



}

三、服务消费者(rpc-cusmer)

1、pom依赖rpc-api
2、客户端远程代理

  
    
    
  
package com.rpc.client;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

import com.rpc.api.Api;
import com.rpc.data.RequestData;

public class RpcClient {
/***
* 客户端远程代理
* @param interfaceClass
* @param address
* @return
*/
@SuppressWarnings("unchecked")
public static <T> Object getRemoteProxy(final Class<?> interfaceClass,final InetSocketAddress address){
return (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
try{
Socket client=new Socket();
client.connect(address);
try{
ObjectOutputStream serializer=new ObjectOutputStream(client.getOutputStream());
ObjectInputStream deserializer=new ObjectInputStream(client.getInputStream());
RequestData requestData=new RequestData();
requestData.setInterfaceName(interfaceClass.getName());
requestData.setMethodName(method.getName());
requestData.setParameterTypes(method.getParameterTypes());
requestData.setParameter(args);
serializer.writeObject(requestData);
return deserializer.readObject();
}catch(Exception e){

}
}catch(Exception e){

}
return null;
}
});
}


public static void main(String[] args) {
Api api=(Api) RpcClient.getRemoteProxy(Api.class, new InetSocketAddress("localhost", 12345));
String result=api.testApi("fwpfjgwwj");
System.out.println(result);
/***
* 可以将spring框架引入,即可用spring管理对象
*/

}

}

四、说明

1、本案例采用原生Socket,方式,可以优化为nio
2、本案例采用jdk动态代理
3、可以将RpcServer更换为zookeeper,为服务注册中心
4、代码地址为
  • rpc-api:https://gitee.com/fengpt/my-project/tree/master/rpc-api

  • rpc-cusmer:https://gitee.com/fengpt/my-project/tree/master/rpc-cusmer

  • rpc-product:https://gitee.com/fengpt/my-project/tree/master/rpc-product


以上是关于手写RPC远程调用的主要内容,如果未能解决你的问题,请参考以下文章

手写简易版rpc框架,理解远程过程调用原理

手写简易版rpc框架,理解远程过程调用原理

手写简易版rpc框架,理解远程过程调用原理

手写dubbo-3rpc雏形——完成基本的远程调用

手写RPC,深入底层理解整个RPC通信

10个类手写实现 RPC 通信框架原理