RPC---- 基于Netty实现的RPC
Posted whc__
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RPC---- 基于Netty实现的RPC相关的知识,希望对你有一定的参考价值。
基于Netty实现的RPC
网络传输从BIO到NIO,序列化要减少字节流长度,提高序列化反序列化的效率
一、Netty服务端和客户端
1、服务端server
1.1 NettyServer
服务端接收客户端的RpcRquest请求,其中执行链中添加了对应的处理器,分别是编码器、拆包器、解码器、客户端处理器
- 编码器(对象 -> 字节数组 -> ByteBuf(自定义协议)):发送RpcResponse响应对象,经过CommonEncoder编码按照自定义协议编码成ByteBuf对象
- 拆包器:接收客户端请求的RpcRequest对象编码成的ByteBuf对象,按照基于固定长度域的拆包器进行拆包
- 解码器(ByteBuf -> 字节数组 -> 对象(自定义协议)):对ByteBuf对象按照自定义协议进行解码成POJO对象
- 服务端处理器:NettyServerHandler,接收客户端传送的RpcReuqest,执行客户端调用对应接口的服务方法,返回响应对象RpcResponse
/**
* Netty中处理RpcRequest的Handler
* @ClassName: NettyServerHandler
* @Author: whc
* @Date: 2021/05/29/21:49
*/
public class NettyServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private static RequestHandler requestHandler;
private static ServiceRegistry serviceRegistry;
static {
requestHandler = new RequestHandler();
serviceRegistry = new DefaultServiceRegistry();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
try {
logger.info("服务器接收到消息:{}", msg);
String interfaceName = msg.getInterfaceName();
Object server = serviceRegistry.getService(interfaceName);
Object result = requestHandler.handle(msg, server);
// 向客户端返回响应数据
// 注意服务端处理器这里的执行链顺序,因为是ctx.writeAndFlush而不是ch.writeAndFlush,所以在执行出栈(out)时,是从当前ctx处理器从后往前找,而不是从通道最后从后往前找
ChannelFuture future = ctx.writeAndFlush(RpcResponse.success(result, msg.getRequestId()));
// 消息发送完毕关闭连接
future.addListener(ChannelFutureListener.CLOSE);
} finally {
// 记得释放对象,防止内存泄漏
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("处理过程调用时有错误发生");
cause.printStackTrace();
ctx.close();
}
}
1.2 NettyServerHandler
位于服务端责任链的尾部,用于接收RpcRequest,并且执行调用,执行真正的接口调用方法,返回处理结果。
/**
* Netty中处理RpcRequest的Handler
* @ClassName: NettyServerHandler
* @Author: whc
* @Date: 2021/05/29/21:49
*/
public class NettyServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private static RequestHandler requestHandler;
private static ServiceRegistry serviceRegistry;
static {
requestHandler = new RequestHandler();
serviceRegistry = new DefaultServiceRegistry();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
try {
logger.info("服务器接收到消息:{}", msg);
String interfaceName = msg.getInterfaceName();
Object server = serviceRegistry.getService(interfaceName);
Object result = requestHandler.handle(msg, server);
// 向客户端返回响应数据
ChannelFuture future = ctx.writeAndFlush(RpcResponse.success(result, msg.getRequestId()));
// 消息发送完毕关闭连接
future.addListener(ChannelFutureListener.CLOSE);
} finally {
// 记得释放对象,防止内存泄漏
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("处理过程调用时有错误发生");
cause.printStackTrace();
ctx.close();
}
}
2、客户端client
2.1 NettyClient
负责执行客户端调用远程服务端服务的sendRequest请求,其中执行链中添加了对应的处理器,分别是编码器、拆包器、解码器、客户端处理器
- 编码器(对象 -> 字节数组 -> ByteBuf(自定义协议)):发送RpcRequest请求对象,经过CommonEncoder编码按照自定义协议编码成ByteBuf对象
- 拆包器:接收服务端响应回来的RpcResponse对象编码成的ByteBuf对象,然后对网络数据包按照基于固定长度域的拆包器进行拆包
- 解码器(ByteBuf -> 字节数组 -> 对象(自定义协议)):对ByteBuf对象按照自定义协议进行解码成POJO对象
- 客户端处理器:NettyClientHandler,接收服务端传送的RpcReponse,设置服务端响应的RpcResponse标识
/**
* NIO方式消费者客户端类
* @ClassName: NettyClient
* @Author: whc
* @Date: 2021/05/29/23:07
*/
public class NettyClient implements RpcClient {
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
private static final Bootstrap bootstrap;
private CommonSerializer serializer;
static {
EventLoopGroup group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 1. 指定线程模型
bootstrap.group(group)
// 2. 指定IO类型为NIO
.channel(NiosocketChannel.class)
// 开启TCP底层心跳机制
.option(ChannelOption.SO_KEEPALIVE, true);
}
private String host;
private int port;
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public Object sendRequest(RpcRequest rpcRequest) {
if(serializer == null) {
logger.error("未设置序列化器");
throw new RpcException(RpcError.SERIALIZER_NOT_FOUND);
}
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 执行链: head -> CommonEncoder(out) -> Spliter -> CommonDecoder -> NettyClientHandler -> tail
// out出栈主要是对写回结果进行加工
// in入栈主要是用来读取服务端数据,写回结果
// 发送RpcRequest请求对象,经过CommonEncoder编码按照自定义协议编码成ByteBuf对象
pipeline.addLast(new CommonEncoder(serializer))
// 接收服务端响应回来的RpcResponse对象,经过Spliter,对网络数据包按照基于固定长度域的拆包器进行拆包
.addLast(new Spliter())
// 对数据包按照自定义协议进行解码成POJO对象
.addLast(new CommonDecoder())
// 客户端对解码出来的POJO对象进行调用处理
.addLast(new NettyClientHandler());
}
});
try {
ChannelFuture future = bootstrap.connect(host, port).sync();
logger.info("客户端连接到服务器 {}:{}", host, port);
Channel channel = future.channel();
if(channel != null) {
// 发送数据
channel.writeAndFlush(rpcRequest).addListener(future1 -> {
if(future1.isSuccess()) {
logger.info(String.format("客户端发送消息: %s", rpcRequest.toString()));
} else {
logger.error("发送消息时有错误发生: ", future1.cause());
}
});
// 为了让netty不会关闭
channel.closeFuture().sync();
// 通过给channel设计别名,获取特定名字下的channel中的内容(这个在hanlder中设置)
// AttributeKey是,线程隔离的,不会有线程安全问题。
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse" + rpcRequest.getRequestId());
RpcResponse rpcResponse = channel.attr(key).get();
RpcMessageChecker.check(rpcRequest, rpcResponse);
return rpcResponse.getData();
}
} catch (InterruptedException e) {
logger.error("发送消息时有错误发生: ", e);
}
return null;
}
@Override
public void setSerializer(CommonSerializer serializer) {
this.serializer = serializer;
}
}
2.2 NettyClientHandler
位于客户端责任链的尾部,用于接收RpcResponse,并且执行调用,给channel设计别名
(判断客户端接收服务端传过来的RpcResponse是否能接收成功,客户端先通过对接收到的RpcResponse在通道中设置一个标识,接着客户端在后续操作中只要从通道获取这个标识符,取出然后判断客户端传递过去的RpcRequest中的请求号是否和服务端传递过来的RpcRespons中的响应请求号相等(对应RpcRequest的请求号),则说明双方成功通信)
/**
* Netty客户端处理器
* @ClassName: NettyClientHandler
* @Author: whc
* @Date: 2021/05/30/0:06
*/
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
try {
logger.info(String.format("客户端接收到消息: %s", msg));
// 接收到response, 给channel设计别名,让sendRequest里读取response
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse" + msg.getRequestId());
ctx.channel().attr(key).set(msg);
ctx.channel().close();
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("过程调用时有错误发生:");
cause.printStackTrace();
ctx.close();
}
}
二、自定义协议和编解码器
1、协议
在传输过程中,在发送的数据上加上各种必要的数据,形成自定义的协议,而自动加上这个数据就是编码器的工作,解析数据获得原始数据就是解码器的工作。
自定义协议如下:
+---------------+---------------+-----------------+-------------+
| Magic Number | Package Type | Serializer Type | Data Length |
| 4 bytes | 4 bytes | 4 bytes | 4 bytes |
+---------------+---------------+-----------------+-------------+
| Data Bytes |
| Length: ${Data Length} |
+---------------------------------------------------------------+
- Magic Number : 魔数,标识一个协议包
- Package Type:标明是一个调用请求还是调用响应
- Serializer Type:标明实际数据使用的序列化器(客户端和服务端应使用统一标准)
- Data Length:实际数据的长度
2、编码器
/**
* 通信协议的设计
* 通用的编码拦截器
* 负责将 POJO 对象编码成 ByteBuf
* @ClassName: CommonEncoder
* @Author: whc
* @Date: 2021/05/29/20:48
*/
public class CommonEncoder extends MessageToByteEncoder {
private static final int MAGIC_NUMBER = 0xCAFEBABE;
private final CommonSerializer serializer;
public CommonEncoder(CommonSerializer serializer) {
this.serializer = serializer;
}
// 自定义传输协议,防止粘包
// 消息格式为: [魔数][数据包类型][序列化器类型][数据长度][数据]
// 4字节 4字节 4字节 4字节
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
// 魔数
out.writeInt(MAGIC_NUMBER);
// 数据包类型
if (msg instanceof RpcRequest) {
out.writeInt(PackageType.REQUEST_PACK.getCode());
} else {
out.writeInt(PackageType.RESPONSE_PACK.getCode());
}
// 序列化器类型
out.writeInt(serializer.getCode());
byte[] bytes = serializer.serialize(msg);
// 数据长度
out.writeInt(bytes.length);
// 数据
out.writeBytes(bytes);
}
}
3、解码器
/**
* 通用的解码拦截器
* 完成 ByteBuf 到 POJO 对象的解码
* @ClassName: CommonDecoder
* @Author: whc
* @Date: 2021/05/29/21:24
*/
public class CommonDecoder extends ReplayingDecoder {
private static final Logger logger = LoggerFactory.getLogger(CommonDecoder.class);
private static final int MAGIC_NUMBER = 0xCAFEBABE;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magic = in.readInt();
if(magic != MAGIC_NUMBER) {
logger.error("不识别的协议包:{}", magic);
throw new RpcException(RpcError.UNKNOWN_PROTOCOL);
}
int packageCode = in.readInt();
Class<?> packageClass;
if(packageCode == PackageType.REQUEST_PACK.getCode()) {
packageClass = RpcRequest.class;
} else if(packageCode == PackageType.RESPONSE_PACK.getCode()) {
packageClass = RpcResponse.class;
} else {
logger.error("不识别的数据包:{}", packageCode);
throw new RpcException(RpcError.UNKNOWN_PACKAGE_TYPE);
}
int serializerCode = in.readInt();
// 获取序列化器类型
CommonSerializer serializer = CommonSerializer.getByCode(serializerCode);
if(serializer == null) {
logger.error("不识别的反序列化器:{}", serializerCode);
throw new RpcException(RpcError.UNKNOWN_SERIALIZER);
}
// 数据长度
int length = in.readInt();
byte[] bytes = new byte[length];
// 填充数据
in.readBytes(bytes);
// 反序列化
Object obj = serializer.deserialize(bytes, packageClass);
out.add(obj);
}
}
4、拆包器
/**
* Netty 提供了 LengthFieldBasedFrameDecoder,自动屏蔽 TCP 底层的拆包和粘包问题,只需要传入正确的参数,即可轻松解决“读半包“问题。
*
* Spliter作用:
* 1. 基于固定长度域的拆包器,根据我们的自定义协议,把数据拼装成一个个符合我们自定义数据包大小的ByteBuf,接着根据我们的自定义协议解码器去解码
* 2. 拒绝非本协议连接
*
* @ClassName: Spliter
* @Author: whc
* @Date: 2021/06/04/22:16
*/
public class Spliter extends LengthFieldBasedFrameDecoder {
private static final int MAGIC_NUMBER = 0xCAFEBABE;
// 消息格式为: [魔数][数据包类型][序列化器类型][数据长度][数据]
// 4字节 4字节 4字节 4字节
private static final int LENGTH_FIELD_OFFSET = 12;
// 数据长度
private static final int LENGTH_FIELD_LENGTH = 4;
public Spliter() {
super(Integer.MAX_VALUE, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if(in.getInt(in.readerIndex()) != MAGIC_NUMBER) {
ctx.channel().closeFuture();
return null;
}
return super.decode(ctx, in);
}
}
RPC---- 基于Netty实现的RPC
基于Netty和SpringBoot实现一个轻量级RPC框架-Client篇