手写RPC远程调用
Posted 程序猿Fengpt
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手写RPC远程调用相关的知识,希望对你有一定的参考价值。
远程调用原理
比如 A (client) 调用 B (server) 提供的方法:
首先A与B之间建立一个TCP连接;
然后A把需要调用的方法名以及方法参数序列化成字节流发送出去;
B接受A发送过来的字节流,然后反序列化得到目标方法名,方法参数,接着执行相应的方法调用并把结果返回;
A接受远程调用结果,输出。
RPC框架就是把我刚才说的这几点些细节给封装起来,给用户暴露简单友好的API使用。
这里我们采用jdk的动态代理实现
话不多说,直接看代码。
本案例分为三个模块rpc-api、rpc-cusmer、rpc-product。
一、首先暴露出来一个接口(rpc-api)
1、提供一个接口
package com.rpc.api;
public interface Api {
public String testApi(String name);
}
2、封装一个request的data
package com.rpc.data;
import java.io.Serializable;
public class RequestData implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String interfaceName;
private String methodName;
private Class<?>[]parameterTypes;
private Object[] parameter;
public String getInterfaceName() {
return interfaceName;
}
public void setInterfaceName(String interfaceName) {
this.interfaceName = interfaceName;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public Object[] getParameter() {
return parameter;
}
public void setParameter(Object[] parameter) {
this.parameter = parameter;
}
}
二、服务的提供者(rpc-product)
1、pom依赖rpc-api
2、实现接口
package com.rpc.service;
import com.rpc.api.Api;
public class ApiImp implements Api {
public String testApi(String name) {
// TODO Auto-generated method stub
return "Hello "+name;
}
}
3、提供一个服务
package com.rpc.service;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.rpc.data.RequestData;
public class RpcServer {
//zookeper
private Map<String,Object> serviceMap=new ConcurrentHashMap<String, Object>(32);
private ThreadPoolExecutor excExecutor=new ThreadPoolExecutor(8,20, 200,TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
public void publishService(Class<?>interfaceClass,Object instance){
this.serviceMap.put(interfaceClass.getName(), instance);
}
public void start(int port){
try {
ServerSocket serverSocket=new ServerSocket();
serverSocket.bind(new InetSocketAddress(port));
System.out.println("Ѿ.....");
while(true){
excExecutor.execute(new Tack(serverSocket.accept()));
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private class Tack implements Runnable{
private Socket client;
public Tack(Socket socket){
this.client=socket;
}
public void run() {
try{
ObjectOutputStream serializer=new ObjectOutputStream(client.getOutputStream());
ObjectInputStream deserializer=new ObjectInputStream(client.getInputStream());
RequestData requestData=(RequestData) deserializer.readObject();
Object instance=serviceMap.get(requestData.getInterfaceName());
Method method= instance.getClass().getDeclaredMethod(requestData.getMethodName(),requestData.getParameterTypes());
Object result=method.invoke(instance, requestData.getParameter());
serializer.writeObject(result);
}catch(Exception e){
}
}
}
}
4、对外发布接口
package com.rpc.service;
import com.rpc.api.Api;
public class ServerMain {
public static void main(String[] args) {
RpcServer rpcServer=new RpcServer();
rpcServer.publishService(Api.class, new ApiImp());
rpcServer.start(12345);
}
}
三、服务消费者(rpc-cusmer)
1、pom依赖rpc-api
2、客户端远程代理
package com.rpc.client;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
import com.rpc.api.Api;
import com.rpc.data.RequestData;
public class RpcClient {
/***
* 客户端远程代理
* @param interfaceClass
* @param address
* @return
*/
@SuppressWarnings("unchecked")
public static <T> Object getRemoteProxy(final Class<?> interfaceClass,final InetSocketAddress address){
return (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
try{
Socket client=new Socket();
client.connect(address);
try{
ObjectOutputStream serializer=new ObjectOutputStream(client.getOutputStream());
ObjectInputStream deserializer=new ObjectInputStream(client.getInputStream());
RequestData requestData=new RequestData();
requestData.setInterfaceName(interfaceClass.getName());
requestData.setMethodName(method.getName());
requestData.setParameterTypes(method.getParameterTypes());
requestData.setParameter(args);
serializer.writeObject(requestData);
return deserializer.readObject();
}catch(Exception e){
}
}catch(Exception e){
}
return null;
}
});
}
public static void main(String[] args) {
Api api=(Api) RpcClient.getRemoteProxy(Api.class, new InetSocketAddress("localhost", 12345));
String result=api.testApi("fwpfjgwwj");
System.out.println(result);
/***
* 可以将spring框架引入,即可用spring管理对象
*/
}
}
四、说明
1、本案例采用原生Socket,方式,可以优化为nio
2、本案例采用jdk动态代理
3、可以将RpcServer更换为zookeeper,为服务注册中心
4、代码地址为
rpc-api:https://gitee.com/fengpt/my-project/tree/master/rpc-api
rpc-cusmer:https://gitee.com/fengpt/my-project/tree/master/rpc-cusmer
rpc-product:https://gitee.com/fengpt/my-project/tree/master/rpc-product
以上是关于手写RPC远程调用的主要内容,如果未能解决你的问题,请参考以下文章