RPC ---- 基于BIO实现的RPC
Posted TheWhc
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RPC ---- 基于BIO实现的RPC相关的知识,希望对你有一定的参考价值。
基于BIO实现的RPC
Java BIO + JDK原生序列化 + JDK动态代理实现
代码实现
1、rpc-api
存放服务接口
1.1 服务接口
public interface BlogService
Blog getBlogById(Integer id);
public interface UserService
// 客户端通过这个接口调用服务端的实现类
User getUserByUserId(Integer id);
// 给这个服务增加一个功能
Integer insertUserId(User user);
1.2 实体类
需要传递对应的对象,分别是Blog
对象和User
对象,都需要实现序列化接口,用的是JDK自带的序列化接口,原因它需要在调用过程中是从客户端传递给服务端。
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Blog implements Serializable
private Integer id;
private Integer userId;
private String title;
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable
// 客户端和服务端共有的
private Integer id;
private String userName;
private Boolean sex;
2、rpc-common
存放通用消息格式(Request、Response),枚举类、异常类
2.1 通用消息
-
Request对象
作用:消费者向提供者发送的请求对象
@Data @Builder public class RpcRequest implements Serializable /** * 待调用接口名称 */ private String interfaceName; /** * 待调用方法名称 */ private String methodName; /** * 调用方法的参数 */ private Object[] parameters; /** * 调用方法的参数类型 */ private Class<?>[] paramTypes;
-
Response对象
作用:服务提供者执行完或出错后向消费者返回的结果对象
@Data public class RpcResponse<T> implements Serializable /** * 响应状态码 */ private Integer statusCode; /** * 响应状态补充信息 */ private String message; /** * 响应数据 */ private T data; public static <T> RpcResponse<T> success(T data) RpcResponse<T> response = new RpcResponse<>(); response.setStatusCode(ResponseCode.SUCCESS.getCode()); response.setData(data); return response; public static <T> RpcResponse<T> fail(ResponseCode code) RpcResponse<T> response = new RpcResponse<>(); response.setStatusCode(code.getCode()); response.setMessage(code.getMessage()); return response;
2.2 枚举类
-
ResponseCode
作用:方法调用的响应状态码
@AllArgsConstructor @Getter public enum ResponseCode SUCCESS(200, "调用方法成功"), FAIL(500,"调用方法失败"), METHOD_NOT_FOUND(500,"未找到指定方法"), CLASS_NOT_FOUND(500,"未找到指定类"); private final int code; private final String message;
-
RpcError
作用:RPC调用过程中的错误枚举类
@AllArgsConstructor @Getter public enum RpcError SERVICE_INVOCATION_FAILURE("服务调用出现失败"), SERVICE_NOT_FOUND("找不到对应的服务"), SERVICE_NOT_IMPLEMENT_ANY_INTERFACE("注册的服务未实现接口"); private final String message;
2.3 异常类
public class RpcException extends RuntimeException
public RpcException(RpcError error, String detail)
super(error.getMessage() + ": " + detail);
public RpcException(String message, Throwable cause)
super(message, cause);
public RpcException(RpcError error)
super(error.getMessage());
3、rpc-core
RPC框架的核心实现类
3.1 服务注册类
作用:将服务暴露,即将接口的实现类注册到注册表中(先用Map存储)。
方法:注册服务、获取服务
-
注册服务
根据获取到的服务类,用Map作为映射关系,将服务接口和服务实现类进行存储。
测试服务端一旦启动,就会将接口以及对应的实现类注册起来。
-
获取服务
服务端一旦获取到客户端的请求Request,解析出来对应的接口名,就会到注册表Map中获取对应的实现类,进而调用实现类的方法,获取执行结果
3.1.1 服务注册表通用接口
public interface ServiceRegistry
/**
* 将一个服务注册进注册表
* @param service 待注册的服务实体
* @param <T> 服务实体类
*/
<T> void register(T service);
/**
* 根据服务名称获取服务实体
* @param serviceName 服务名称
* @return 服务实体
*/
Object getService(String serviceName);
3.1.2 默认服务注册表
public class DefaultServiceRegistry implements ServiceRegistry
private static final Logger logger = LoggerFactory.getLogger(DefaultServiceRegistry.class);
private final Map<String, Object> serviceMap = new ConcurrentHashMap<>();
private final Set<String> registeredService = ConcurrentHashMap.newKeySet();
@Override
public synchronized <T> void register(T service)
String serviceName = service.getClass().getCanonicalName();
if(registeredService.contains(serviceName)) return;
registeredService.add(serviceName);
// 利用Class对象获取到服务接口
Class<?>[] interfaces = service.getClass().getInterfaces();
if(interfaces.length == 0)
throw new RpcException(RpcError.SERVICE_NOT_IMPLEMENT_ANY_INTERFACE);
for(Class<?> i : interfaces)
serviceMap.put(i.getCanonicalName(), service);
logger.info("向接口: 注册服务: ", interfaces, serviceName);
@Override
public synchronized Object getService(String serviceName)
Object service = serviceMap.get(serviceName);
if(service == null)
throw new RpcException(RpcError.SERVICE_NOT_FOUND);
return service;
3.2 服务端(提供者)
-
RpcServer
作用:作为远程方法调用的提供者
public class RpcServer private static final Logger logger = LoggerFactory.getLogger(RpcServer.class); private static final int CORE_POOL_SIZE = 5; private static final int MAXIMUM_POOL_SIZE = 50; private static final int KEEP_ALIVE_TIME = 60; private static final int BLOCKING_QUEUE_CAPACITY = 100; private final ExecutorService threadPool; private RequestHandler requestHandler = new RequestHandler(); private final ServiceRegistry serviceRegistry; public RpcServer(ServiceRegistry serviceRegistry) this.serviceRegistry = serviceRegistry; BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY); ThreadFactory threadFactory = Executors.defaultThreadFactory(); threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, workingQueue, threadFactory); public void start(int port) try (ServerSocket serverSocket = new ServerSocket(port)) logger.info("服务器启动……"); Socket socket; // BIO的方式监听Socket while((socket = serverSocket.accept()) != null) logger.info("消费者连接: :", socket.getInetAddress(), socket.getPort()); // 线程池创建线程处理消费者的请求 threadPool.execute(new RequestHandlerThread(socket, requestHandler, serviceRegistry)); threadPool.shutdown(); catch (IOException e) logger.error("服务器启动时有错误发生:", e);
-
RequestHandlerThread(处理线程)
作用:
- 作为处理RpcRequest的工作线程,客户端每请求一次请求,服务端用就用一个线程进行处理,为了更好的管理线程,提高资源的利用率和降低资源消耗,所以自定义了线程池
- 从服务端代码分离出来,简化了服务端代码,单一职责原则
- 负责解析得到的Request,执行服务方法,返回给客户端
- 从Request得到interfaceName
- 根据interfaceName在ServiceRegistry中获取服务端的实现类
- 将Request请求和获取到的实现类交给RequestHandler处理器处理,进一步解耦代码
- 获取处理器返回的结果,封装成Response对象,写入Socket
@AllArgsConstructor public class RequestHandlerThread implements Runnable private static final Logger logger = LoggerFactory.getLogger(RequestHandlerThread.class); private Socket socket; private RequestHandler requestHandler; private ServiceRegistry serviceRegistry; @Override public void run() try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject(); // 接口名称 String interfaceName = rpcRequest.getInterfaceName(); // 接口实现类 Object service = serviceRegistry.getService(interfaceName); Object result = requestHandler.handle(rpcRequest, service); objectOutputStream.writeObject(RpcResponse.success(result)); objectOutputStream.flush(); catch (IOException | ClassNotFoundException e) logger.error("调用或发送时有错误发生:", e);
-
RequestHandler(处理逻辑)
作用:进行过程调用的处理器,进行真正的方法反射执行。
RequestHandlerThread 只是一个线程,从ServiceRegistry 获取到提供服务的对象后,就会把RpcRequest 和服务对象直接交给RequestHandler 去处理,反射过程被放到了RequestHandler 里
public class RequestHandler private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class); public Object handle(RpcRequest rpcRequest, Object service) Object result = null; try result = invokeTargetMethod(rpcRequest, service); logger.info("服务: 成功调用方法:", rpcRequest.getInterfaceName(), rpcRequest.getMethodName()); catch (IllegalAccessException | InvocationTargetException e) logger.error("调用或发送时有错误发生:", e); return result; private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) throws IllegalAccessException, InvocationTargetException Method method; try method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes()); catch (NoSuchMethodException e) return RpcResponse.fail(ResponseCode.METHOD_NOT_FOUND); return method.invoke(service, rpcRequest.getParameters());
3.3 客户端(消费者)
-
RpcClientProxy
作用:RPC客户端代理类,动态代理封装Request对象
消费者调用服务接口方法时,实际上调用的接口代理类的invoke方法,把需要调用的接口名称、方法、参数通过代理类封装成RpcRequest对象后传递给RpcClient发送出去。
public class RpcClientProxy implements InvocationHandler private static final Logger logger = LoggerFactory.getLogger(RpcClientProxy.class); private String host; private int port; public RpcClientProxy(String host, int port) this.host = host; this.port = port; @SuppressWarnings("unchecked") public <T> T getProxy(Class<T> clazz) return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]clazz, this); // jdk 动态代理, 每一次代理对象调用方法,会经过此方法增强 (反射获取request对象,socket发送至客户端) @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable logger.info("调用方法: #", method.getDeclaringClass().getName(), method.getName()); RpcRequest rpcRequest = RpcRequest.builder() .interfaceName(method.getDeclaringClass().getName()) .methodName(method.getName()) .parameters(args) .paramTypes(method.getParameterTypes()) .build(); RpcClient rpcClient = new RpcClient(); return rpcClient.sendRequest(rpcRequest, host, port);
-
RpcClient
作用:远程方法调用的消费者
负责将RpcReuqest发送出去,同时接收服务端处理完结果返回的RpcResponse对象。
使用Java的序列化方式,通过Socket传输。创建一个Socket,获取ObjectOutputStream对象,然后把需要发送的对象传过去即可。接收时获取ObjectInputStream对象,readObject()方法就可以获得一个返回的对象。
public class RpcClient private static final Logger logger = LoggerFactory.getLogger(RpcClient.class); public Object sendRequest(RpcRequest rpcRequest, String host, int port) try (Socket socket = new Socket(host, port)) ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); objectOutputStream.一个简单的基于BIO的RPC框架