RPC---- 基于Netty实现的RPC

Posted whc__

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RPC---- 基于Netty实现的RPC相关的知识,希望对你有一定的参考价值。

网络传输从BIO到NIO,序列化要减少字节流长度,提高序列化反序列化的效率

一、Netty服务端和客户端

1、服务端server

1.1 NettyServer

服务端接收客户端的RpcRquest请求,其中执行链中添加了对应的处理器,分别是编码器、拆包器、解码器、客户端处理器

  • 编码器(对象 -> 字节数组 -> ByteBuf(自定义协议)):发送RpcResponse响应对象,经过CommonEncoder编码按照自定义协议编码成ByteBuf对象
  • 拆包器:接收客户端请求的RpcRequest对象编码成的ByteBuf对象,按照基于固定长度域的拆包器进行拆包
  • 解码器(ByteBuf -> 字节数组 -> 对象(自定义协议)):对ByteBuf对象按照自定义协议进行解码成POJO对象
  • 服务端处理器:NettyServerHandler,接收客户端传送的RpcReuqest,执行客户端调用对应接口的服务方法,返回响应对象RpcResponse
/**
 * Netty中处理RpcRequest的Handler
 * @ClassName: NettyServerHandler
 * @Author: whc
 * @Date: 2021/05/29/21:49
 */
public class NettyServerHandler extends SimpleChannelInboundHandler<RpcRequest> {

	private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
	private static RequestHandler requestHandler;
	private static ServiceRegistry serviceRegistry;

	static {
		requestHandler = new RequestHandler();
		serviceRegistry = new DefaultServiceRegistry();
	}


	@Override
	protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
		try {
			logger.info("服务器接收到消息:{}", msg);
			String interfaceName = msg.getInterfaceName();
			Object server = serviceRegistry.getService(interfaceName);
			Object result = requestHandler.handle(msg, server);
			// 向客户端返回响应数据
			// 注意服务端处理器这里的执行链顺序,因为是ctx.writeAndFlush而不是ch.writeAndFlush,所以在执行出栈(out)时,是从当前ctx处理器从后往前找,而不是从通道最后从后往前找
			ChannelFuture future = ctx.writeAndFlush(RpcResponse.success(result, msg.getRequestId()));
			// 消息发送完毕关闭连接
			future.addListener(ChannelFutureListener.CLOSE);
		} finally {
			// 记得释放对象,防止内存泄漏
			ReferenceCountUtil.release(msg);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		logger.error("处理过程调用时有错误发生");
		cause.printStackTrace();
		ctx.close();
	}

}

1.2 NettyServerHandler

位于服务端责任链的尾部,用于接收RpcRequest,并且执行调用,执行真正的接口调用方法,返回处理结果。

/**
 * Netty中处理RpcRequest的Handler
 * @ClassName: NettyServerHandler
 * @Author: whc
 * @Date: 2021/05/29/21:49
 */
public class NettyServerHandler extends SimpleChannelInboundHandler<RpcRequest> {

	private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
	private static RequestHandler requestHandler;
	private static ServiceRegistry serviceRegistry;

	static {
		requestHandler = new RequestHandler();
		serviceRegistry = new DefaultServiceRegistry();
	}


	@Override
	protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
		try {
			logger.info("服务器接收到消息:{}", msg);
			String interfaceName = msg.getInterfaceName();
			Object server = serviceRegistry.getService(interfaceName);
			Object result = requestHandler.handle(msg, server);
			// 向客户端返回响应数据
			ChannelFuture future = ctx.writeAndFlush(RpcResponse.success(result, msg.getRequestId()));
			// 消息发送完毕关闭连接
			future.addListener(ChannelFutureListener.CLOSE);
		} finally {
			// 记得释放对象,防止内存泄漏
			ReferenceCountUtil.release(msg);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		logger.error("处理过程调用时有错误发生");
		cause.printStackTrace();
		ctx.close();
	}

}

2、客户端client

2.1 NettyClient

负责执行客户端调用远程服务端服务的sendRequest请求,其中执行链中添加了对应的处理器,分别是编码器、拆包器、解码器、客户端处理器

  • 编码器(对象 -> 字节数组 -> ByteBuf(自定义协议)):发送RpcRequest请求对象,经过CommonEncoder编码按照自定义协议编码成ByteBuf对象
  • 拆包器:接收服务端响应回来的RpcResponse对象编码成的ByteBuf对象,然后对网络数据包按照基于固定长度域的拆包器进行拆包
  • 解码器(ByteBuf -> 字节数组 -> 对象(自定义协议)):对ByteBuf对象按照自定义协议进行解码成POJO对象
  • 客户端处理器:NettyClientHandler,接收服务端传送的RpcReponse,设置服务端响应的RpcResponse标识
/**
 * NIO方式消费者客户端类
 * @ClassName: NettyClient
 * @Author: whc
 * @Date: 2021/05/29/23:07
 */
public class NettyClient implements RpcClient {

	private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
	private static final Bootstrap bootstrap;

	private CommonSerializer serializer;

	static {
		EventLoopGroup group = new NioEventLoopGroup();
		bootstrap = new Bootstrap();
		// 1. 指定线程模型
		bootstrap.group(group)
				// 2. 指定IO类型为NIO
				.channel(NiosocketChannel.class)
				// 开启TCP底层心跳机制
				.option(ChannelOption.SO_KEEPALIVE, true);
	}

	private String host;
	private int port;

	public NettyClient(String host, int port) {
		this.host = host;
		this.port = port;
	}

	@Override
	public Object sendRequest(RpcRequest rpcRequest) {
		if(serializer == null) {
			logger.error("未设置序列化器");
			throw new RpcException(RpcError.SERIALIZER_NOT_FOUND);
		}
		bootstrap.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel ch) throws Exception {
				ChannelPipeline pipeline = ch.pipeline();
				// 执行链: head -> CommonEncoder(out) -> Spliter -> CommonDecoder -> NettyClientHandler -> tail
				// out出栈主要是对写回结果进行加工
				// in入栈主要是用来读取服务端数据,写回结果
				// 发送RpcRequest请求对象,经过CommonEncoder编码按照自定义协议编码成ByteBuf对象
				pipeline.addLast(new CommonEncoder(serializer))
						// 接收服务端响应回来的RpcResponse对象,经过Spliter,对网络数据包按照基于固定长度域的拆包器进行拆包
						.addLast(new Spliter())
						// 对数据包按照自定义协议进行解码成POJO对象
						.addLast(new CommonDecoder())
						// 客户端对解码出来的POJO对象进行调用处理
						.addLast(new NettyClientHandler());
			}
		});
		try {
			ChannelFuture future = bootstrap.connect(host, port).sync();
			logger.info("客户端连接到服务器 {}:{}", host, port);
			Channel channel = future.channel();
			if(channel != null) {
				// 发送数据
				channel.writeAndFlush(rpcRequest).addListener(future1 -> {
					if(future1.isSuccess()) {
						logger.info(String.format("客户端发送消息: %s", rpcRequest.toString()));
					} else {
						logger.error("发送消息时有错误发生: ", future1.cause());
					}
				});
				// 为了让netty不会关闭
				channel.closeFuture().sync();
				// 通过给channel设计别名,获取特定名字下的channel中的内容(这个在hanlder中设置)
				// AttributeKey是,线程隔离的,不会有线程安全问题。
				AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse" + rpcRequest.getRequestId());
				RpcResponse rpcResponse = channel.attr(key).get();
				RpcMessageChecker.check(rpcRequest, rpcResponse);
				return rpcResponse.getData();
			}
		} catch (InterruptedException e) {
			logger.error("发送消息时有错误发生: ", e);
		}
		return null;
	}

	@Override
	public void setSerializer(CommonSerializer serializer) {
		this.serializer = serializer;
	}
}

2.2 NettyClientHandler

位于客户端责任链的尾部,用于接收RpcResponse,并且执行调用,给channel设计别名

(判断客户端接收服务端传过来的RpcResponse是否能接收成功,客户端先通过对接收到的RpcResponse在通道中设置一个标识,接着客户端在后续操作中只要从通道获取这个标识符,取出然后判断客户端传递过去的RpcRequest中的请求号是否和服务端传递过来的RpcRespons中的响应请求号相等(对应RpcRequest的请求号),则说明双方成功通信)

/**
 * Netty客户端处理器
 * @ClassName: NettyClientHandler
 * @Author: whc
 * @Date: 2021/05/30/0:06
 */
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {

	private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
		try {
			logger.info(String.format("客户端接收到消息: %s", msg));
			// 接收到response, 给channel设计别名,让sendRequest里读取response
			AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse" + msg.getRequestId());
			ctx.channel().attr(key).set(msg);
			ctx.channel().close();
		} finally {
			ReferenceCountUtil.release(msg);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		logger.error("过程调用时有错误发生:");
		cause.printStackTrace();
		ctx.close();
	}
}

二、自定义协议和编解码器

1、协议

在传输过程中,在发送的数据上加上各种必要的数据,形成自定义的协议,而自动加上这个数据就是编码器的工作,解析数据获得原始数据就是解码器的工作。

自定义协议如下:

+---------------+---------------+-----------------+-------------+
|  Magic Number |  Package Type | Serializer Type | Data Length |
|    4 bytes    |    4 bytes    |     4 bytes     |   4 bytes   |
+---------------+---------------+-----------------+-------------+
|                          Data Bytes                           |
|                   Length: ${Data Length}                      |
+---------------------------------------------------------------+
  • Magic Number : 魔数,标识一个协议包
  • Package Type:标明是一个调用请求还是调用响应
  • Serializer Type:标明实际数据使用的序列化器(客户端和服务端应使用统一标准)
  • Data Length:实际数据的长度

2、编码器

/**
 * 通信协议的设计
 * 通用的编码拦截器
 * 负责将 POJO 对象编码成 ByteBuf
 * @ClassName: CommonEncoder
 * @Author: whc
 * @Date: 2021/05/29/20:48
 */
public class CommonEncoder extends MessageToByteEncoder {

	private static final int MAGIC_NUMBER = 0xCAFEBABE;

	private final CommonSerializer serializer;

	public CommonEncoder(CommonSerializer serializer) {
		this.serializer = serializer;
	}

	// 自定义传输协议,防止粘包
	// 消息格式为: [魔数][数据包类型][序列化器类型][数据长度][数据]
	//			  4字节   4字节      4字节       4字节
	@Override
	protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
		// 魔数
		out.writeInt(MAGIC_NUMBER);
		// 数据包类型
		if (msg instanceof RpcRequest) {
			out.writeInt(PackageType.REQUEST_PACK.getCode());
		} else {
			out.writeInt(PackageType.RESPONSE_PACK.getCode());
		}
		// 序列化器类型
		out.writeInt(serializer.getCode());
		byte[] bytes = serializer.serialize(msg);
		// 数据长度
		out.writeInt(bytes.length);
		// 数据
		out.writeBytes(bytes);
	}
}

3、解码器

/**
 * 通用的解码拦截器
 * 完成 ByteBuf 到 POJO 对象的解码
 * @ClassName: CommonDecoder
 * @Author: whc
 * @Date: 2021/05/29/21:24
 */
public class CommonDecoder extends ReplayingDecoder {

	private static final Logger logger = LoggerFactory.getLogger(CommonDecoder.class);

	private static final int MAGIC_NUMBER = 0xCAFEBABE;


	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
		int magic = in.readInt();
		if(magic != MAGIC_NUMBER) {
			logger.error("不识别的协议包:{}", magic);
			throw new RpcException(RpcError.UNKNOWN_PROTOCOL);
		}
		int packageCode = in.readInt();
		Class<?> packageClass;
		if(packageCode == PackageType.REQUEST_PACK.getCode()) {
			packageClass = RpcRequest.class;
		} else if(packageCode == PackageType.RESPONSE_PACK.getCode()) {
			packageClass = RpcResponse.class;
		} else {
			logger.error("不识别的数据包:{}", packageCode);
			throw new RpcException(RpcError.UNKNOWN_PACKAGE_TYPE);
		}
		int serializerCode = in.readInt();
		// 获取序列化器类型
		CommonSerializer serializer = CommonSerializer.getByCode(serializerCode);
		if(serializer == null) {
			logger.error("不识别的反序列化器:{}", serializerCode);
			throw new RpcException(RpcError.UNKNOWN_SERIALIZER);
		}
		// 数据长度
		int length = in.readInt();
		byte[] bytes = new byte[length];
		// 填充数据
		in.readBytes(bytes);
		// 反序列化
		Object obj = serializer.deserialize(bytes, packageClass);
		out.add(obj);
	}
}

4、拆包器

/**
 * Netty 提供了 LengthFieldBasedFrameDecoder,自动屏蔽 TCP 底层的拆包和粘包问题,只需要传入正确的参数,即可轻松解决“读半包“问题。
 *
 * Spliter作用:
 * 1. 基于固定长度域的拆包器,根据我们的自定义协议,把数据拼装成一个个符合我们自定义数据包大小的ByteBuf,接着根据我们的自定义协议解码器去解码
 * 2. 拒绝非本协议连接
 *
 * @ClassName: Spliter
 * @Author: whc
 * @Date: 2021/06/04/22:16
 */
public class Spliter extends LengthFieldBasedFrameDecoder {

	private static final int MAGIC_NUMBER = 0xCAFEBABE;

	// 消息格式为: [魔数][数据包类型][序列化器类型][数据长度][数据]
	//			  4字节   4字节      4字节       4字节
	private static final int LENGTH_FIELD_OFFSET = 12;
	// 数据长度
	private static final int LENGTH_FIELD_LENGTH = 4;

	public Spliter() {
		super(Integer.MAX_VALUE, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH);
	}

	@Override
	protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
		if(in.getInt(in.readerIndex()) != MAGIC_NUMBER) {
			ctx.channel().closeFuture();
			return null;
		}
		return super.decode(ctx, in);
	}
}

RPC---- 基于Netty实现的RPC

基于Zookeeper与Netty实现的分布式RPC服务

基于Netty和SpringBoot实现一个轻量级RPC框架-Client篇

自己动手,基于netty实现单机版的RPC

基于Netty和SpringBoot实现一个轻量级RPC框架-Client篇

手写dubbo 10-基于netty实现RPC