RPC ---- 基于BIO实现的RPC

Posted whc__

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RPC ---- 基于BIO实现的RPC相关的知识,希望对你有一定的参考价值。

Java BIO + JDK原生序列化 + JDK动态代理实现

代码实现

image-20210525211259596

image-20210528205440282

1、rpc-api

存放服务接口

1.1 服务接口

public interface BlogService {
	Blog getBlogById(Integer id);
}
public interface UserService {
	// 客户端通过这个接口调用服务端的实现类
	User getUserByUserId(Integer id);
	// 给这个服务增加一个功能
	Integer insertUserId(User user);
}

1.2 实体类

需要传递对应的对象,分别是Blog对象和User对象,都需要实现序列化接口,用的是JDK自带的序列化接口,原因它需要在调用过程中是从客户端传递给服务端。

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Blog implements Serializable {
	private Integer id;
	private Integer userId;
	private String title;
}
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
	// 客户端和服务端共有的
	private Integer id;
	private String userName;
	private Boolean sex;
}

2、rpc-common

存放通用消息格式(Request、Response),枚举类、异常类

2.1 通用消息

  • Request对象

    作用:消费者向提供者发送的请求对象

    @Data
    @Builder
    public class RpcRequest implements Serializable {
    	/**
    	 * 待调用接口名称
    	 */
    	private String interfaceName;
    
    	/**
    	 * 待调用方法名称
    	 */
    	private String methodName;
    
    	/**
    	 * 调用方法的参数
    	 */
    	private Object[] parameters;
    
    	/**
    	 * 调用方法的参数类型
    	 */
    	private Class<?>[] paramTypes;
    }
    
  • Response对象

    作用:服务提供者执行完或出错后向消费者返回的结果对象

    @Data
    public class RpcResponse<T> implements Serializable {
    
    	/**
    	 * 响应状态码
    	 */
    	private Integer statusCode;
    
    	/**
    	 * 响应状态补充信息
    	 */
    	private String message;
    
    	/**
    	 * 响应数据
    	 */
    	private T data;
    
    	public static <T> RpcResponse<T> success(T data) {
    		RpcResponse<T> response = new RpcResponse<>();
    		response.setStatusCode(ResponseCode.SUCCESS.getCode());
    		response.setData(data);
    		return response;
    	}
    
    	public static <T> RpcResponse<T> fail(ResponseCode code) {
    		RpcResponse<T> response = new RpcResponse<>();
    		response.setStatusCode(code.getCode());
    		response.setMessage(code.getMessage());
    		return response;
    	}
    }
    

2.2 枚举类

  • ResponseCode

    作用:方法调用的响应状态码

    @AllArgsConstructor
    @Getter
    public enum  ResponseCode {
    
    	SUCCESS(200, "调用方法成功"),
    	FAIL(500,"调用方法失败"),
    	METHOD_NOT_FOUND(500,"未找到指定方法"),
    	CLASS_NOT_FOUND(500,"未找到指定类");
    
    	private final int code;
    	private final String message;
    
    }
    
  • RpcError

    作用:RPC调用过程中的错误枚举类

    @AllArgsConstructor
    @Getter
    public enum  RpcError {
    
    	SERVICE_INVOCATION_FAILURE("服务调用出现失败"),
    	SERVICE_NOT_FOUND("找不到对应的服务"),
    	SERVICE_NOT_IMPLEMENT_ANY_INTERFACE("注册的服务未实现接口");
    
    	private final String message;
    }
    

2.3 异常类

public class RpcException extends RuntimeException {

	public RpcException(RpcError error, String detail) {
		super(error.getMessage() + ": " + detail);
	}

	public RpcException(String message, Throwable cause) {
		super(message, cause);
	}

	public RpcException(RpcError error) {
		super(error.getMessage());
	}
}

3、rpc-core

RPC框架的核心实现类

3.1 服务注册类

作用:将服务暴露,即将接口的实现类注册到注册表中(先用Map存储)。

方法:注册服务、获取服务

  • 注册服务

    根据获取到的服务类,用Map作为映射关系,将服务接口和服务实现类进行存储。

    测试服务端一旦启动,就会将接口以及对应的实现类注册起来。

  • 获取服务

    服务端一旦获取到客户端的请求Request,解析出来对应的接口名,就会到注册表Map中获取对应的实现类,进而调用实现类的方法,获取执行结果

3.1.1 服务注册表通用接口

public interface ServiceRegistry {

	/**
	 * 将一个服务注册进注册表
	 * @param service 待注册的服务实体
	 * @param <T> 服务实体类
	 */
	<T> void register(T service);

	/**
	 * 根据服务名称获取服务实体
	 * @param serviceName 服务名称
	 * @return 服务实体
	 */
	Object getService(String serviceName);
}

3.1.2 默认服务注册表

public class DefaultServiceRegistry implements ServiceRegistry {

	private static final Logger logger = LoggerFactory.getLogger(DefaultServiceRegistry.class);

	private final Map<String, Object> serviceMap = new ConcurrentHashMap<>();
	private final Set<String> registeredService = ConcurrentHashMap.newKeySet();

	@Override
	public synchronized <T> void register(T service) {
		String serviceName = service.getClass().getCanonicalName();
		if(registeredService.contains(serviceName)) return;
		registeredService.add(serviceName);
        // 利用Class对象获取到服务接口
		Class<?>[] interfaces = service.getClass().getInterfaces();
		if(interfaces.length == 0) {
			throw new RpcException(RpcError.SERVICE_NOT_IMPLEMENT_ANY_INTERFACE);
		}
		for(Class<?> i : interfaces) {
			serviceMap.put(i.getCanonicalName(), service);
		}
		logger.info("向接口: {} 注册服务: {}", interfaces, serviceName);
	}

	@Override
	public synchronized Object getService(String serviceName) {
		Object service = serviceMap.get(serviceName);
		if(service == null) {
			throw new RpcException(RpcError.SERVICE_NOT_FOUND);
		}
		return service;
	}
}

3.2 服务端(提供者)

  • RpcServer

    作用:作为远程方法调用的提供者

    public class RpcServer {
    
    	private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);
    
    	private static final int CORE_POOL_SIZE = 5;
    	private static final int MAXIMUM_POOL_SIZE = 50;
    	private static final int KEEP_ALIVE_TIME = 60;
    	private static final int BLOCKING_QUEUE_CAPACITY = 100;
    	private final ExecutorService threadPool;
    	private RequestHandler requestHandler = new RequestHandler();
    	private final ServiceRegistry serviceRegistry;
    
    	public RpcServer(ServiceRegistry serviceRegistry) {
    		this.serviceRegistry = serviceRegistry;
    		BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
    		ThreadFactory threadFactory = Executors.defaultThreadFactory();
    		threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, workingQueue, threadFactory);
    	}
    
    	public void start(int port) {
    		try (ServerSocket serverSocket = new ServerSocket(port)) {
    			logger.info("服务器启动……");
    			Socket socket;
    			// BIO的方式监听Socket
    			while((socket = serverSocket.accept()) != null) {
    				logger.info("消费者连接: {}:{}", socket.getInetAddress(), socket.getPort());
    				// 线程池创建线程处理消费者的请求
    				threadPool.execute(new RequestHandlerThread(socket, requestHandler, serviceRegistry));
    			}
    			threadPool.shutdown();
    		} catch (IOException e) {
    			logger.error("服务器启动时有错误发生:", e);
    		}
    	}
    
    }
    
  • RequestHandlerThread(处理线程)

    作用:

    1. 作为处理RpcRequest的工作线程,客户端每请求一次请求,服务端用就用一个线程进行处理,为了更好的管理线程,提高资源的利用率和降低资源消耗,所以自定义了线程池
    2. 从服务端代码分离出来,简化了服务端代码,单一职责原则
    3. 负责解析得到的Request,执行服务方法,返回给客户端
      • 从Request得到interfaceName
      • 根据interfaceName在ServiceRegistry中获取服务端的实现类
      • 将Request请求和获取到的实现类交给RequestHandler处理器处理,进一步解耦代码
      • 获取处理器返回的结果,封装成Response对象,写入Socket
    @AllArgsConstructor
    public class RequestHandlerThread implements Runnable {
    
    	private static final Logger logger = LoggerFactory.getLogger(RequestHandlerThread.class);
    
    	private Socket socket;
    	private RequestHandler requestHandler;
    	private ServiceRegistry serviceRegistry;
    
    	@Override
    	public void run() {
    		try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
    			 ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
    			RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
    			// 接口名称
    			String interfaceName = rpcRequest.getInterfaceName();
    			// 接口实现类
    			Object service = serviceRegistry.getService(interfaceName);
    			Object result = requestHandler.handle(rpcRequest, service);
    			objectOutputStream.writeObject(RpcResponse.success(result));
    			objectOutputStream.flush();
    		} catch (IOException | ClassNotFoundException e) {
    			logger.error("调用或发送时有错误发生:", e);
    		}
    	}
    
    }
    
  • RequestHandler(处理逻辑)

    作用:进行过程调用的处理器,进行真正的方法反射执行。

    RequestHandlerThread 只是一个线程,从ServiceRegistry 获取到提供服务的对象后,就会把RpcRequest 和服务对象直接交给RequestHandler 去处理,反射过程被放到了RequestHandler 里
    
    public class RequestHandler {
    
    	private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class);
    
    	public Object handle(RpcRequest rpcRequest, Object service) {
    		Object result = null;
    		try {
    			result = invokeTargetMethod(rpcRequest, service);
    			logger.info("服务:{} 成功调用方法:{}", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
    		} catch (IllegalAccessException | InvocationTargetException e) {
    			logger.error("调用或发送时有错误发生:", e);
    		} return result;
    	}
    
    	private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) throws IllegalAccessException, InvocationTargetException {
    		Method method;
    		try {
    			method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
    		} catch (NoSuchMethodException e) {
    			return RpcResponse.fail(ResponseCode.METHOD_NOT_FOUND);
    		}
    		return method.invoke(service, rpcRequest.getParameters());
    	}
    
    }
    

3.3 客户端(消费者)

  • RpcClientProxy

    作用:RPC客户端代理类,动态代理封装Request对象

    消费者调用服务接口方法时,实际上调用的接口代理类的invoke方法,把需要调用的接口名称、方法、参数通过代理类封装成RpcRequest对象后传递给RpcClient发送出去。

    public class RpcClientProxy implements InvocationHandler {
    
    	private static final Logger logger = LoggerFactory.getLogger(RpcClientProxy.class);
    	private String host;
    	private int port;
    
    	public RpcClientProxy(String host, int port) {
    		this.host = host;
    		this.port = port;
    	}
    
    	@SuppressWarnings("unchecked")
    	public <T> T getProxy(Class<T> clazz) {
    		return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
    	}
    
    	// jdk 动态代理, 每一次代理对象调用方法,会经过此方法增强 (反射获取request对象,socket发送至客户端)
    	@Override
    	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    		logger.info("调用方法: {}#{}", method.getDeclaringClass().getName(), method.getName());
    		RpcRequest rpcRequest = RpcRequest.builder()
    				.interfaceName(method.getDeclaringClass().getName())
    				.methodName(method.getName())
    				.parameters(args)
    				.paramTypes(method.getParameterTypes())
    				.build();
    		RpcClient rpcClient = new RpcClient();
    		return rpcClient.sendRequest(rpcRequest, host, port);
    	}
    }
    
  • RpcClient

    作用:远程方法调用的消费者

    负责将RpcReuqest发送出去,同时接收服务端处理完结果返回的RpcResponse对象。

    使用Java的序列化方式,通过Socket传输。创建一个Socket,获取ObjectOutputStream对象,然后把需要发送的对象传过去即可。接收时获取ObjectInputStream对象,readObject()方法就可以获得一个返回的对象。
    
    public class RpcClient {
    
    	private static final Logger logger = LoggerFactory.getLogger(RpcClient.class);
    
    	public Object sendRequest(RpcRequest rpcRequest, String host, int port) {
    		try (Socket socket = new Socket(host, port)) {
    			ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
    			ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream一个简单的基于BIO的RPC框架

    基于Spring开发的一个BIO-RPC框架(对小白很友好)

    自己动手,基于netty实现单机版的RPC

    自己实现一个RPC框架

    RPC ---- 基于ZooKeeper为注册中心实现的RPC

    RPC ---- 基于ZooKeeper为注册中心实现的RPC