Netty之启动类编解码器等源码解析及粘包拆包问题
Posted 踩踩踩从踩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty之启动类编解码器等源码解析及粘包拆包问题相关的知识,希望对你有一定的参考价值。
前言
之前文章解析Netty的责任链框架及bytebuf 分析netty的 可复用 动态扩容、零拷贝机制、达到高效,API使用更加便捷等好处;这篇文章会继续分析 netty的启动类、以及编解码器、各种协议的支持、及tcp粘包拆包的解决
Netty引导
Bootstrap
public final class EchoClient {
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NiosocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}
public class NettyStarter {
public static void main(String[] args) throws InterruptedException {
// 主线程组 处理客户连接
NioEventLoopGroup mainGroup = new NioEventLoopGroup(1);
// 工人线程组,处理客户端的请求 读取 和写入
NioEventLoopGroup subGroup = new NioEventLoopGroup();
// 创建启动器, 并配置
ServerBootstrap boostrap = new ServerBootstrap();
boostrap.group(mainGroup, subGroup).option(ChannelOption.SO_BACKLOG, 1024).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO));
// 绑定端口并使用
Channel channel = boostrap.bind(8081).sync().channel();
// 监听channeal 释放并 停止 线程组
channel.closeFuture().addListeners(future -> {
mainGroup.shutdownGracefully();
subGroup.shutdownGracefully();
});
}
}
- group:设置工作组
- channel:设置传输类型
- option: 启动时配置参数
- handler 对数据添加处理
这里的bind方法 在AbstractBootstrap 中实现,然后根据 doBind方法进去
创建通道的channel方法 这里需要传入对应的类进来,因此在启动的时候需要指定对应的NioSocketChannel.class
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
然后在ServerBootstrap 中init方法继续下去创建 pipline
这里会将handler给添加到pipline中
调用到channelRead方法 在eventloop中收到连接或者请求到这里 里面有run 方法循环监听的
当初始化完毕,将channel进行注册
然后这里的ChannelFuture 是获的返回值的。
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
这里 启动的一个线程去注册(initAndRegister)方法,然后判断future 是否注册成功,注册成功,就调用bind方法进行绑定端口,没有成功就添加监听事件,等待注册成功,然后在绑定
最后调用到NioServerSocketChannel的 dobind绑定方法
TCP 粘包 拆包
客户端和服务端,都有一个数据缓存区 memory,因为它并不知道业务场景下数据有多大,默认是1024,它会自己定义只有到1024个字节时,才发送,有可能一次请求的数据是小于1024的,也有可能是大于1024的,因此 导致数据不正确。
客户端发送的数据如果大于1024,而一次请求是发送数据的一部分,到服务端的内存中,这时候服务端访问数据,这个数据就是错误的。这就是拆包问题
以及超时时间
出现问题的实例
public static void main(String[] args) throws Exception {
Socket socket = new Socket("127.0.0.1", 9999);
socket.setTcpNoDelay(true);
OutputStream outputStream = socket.getOutputStream();
// 消息长度固定为 160字节,包含有
// 1. 目标用户ID长度为10, 10 000 000 000 ~ 19 999 999 999
// 2. 消息内容字符串长度最多48。 按一个汉字3字节,内容的最大长度为144字节
byte[] request = new byte[160];
byte[] userId = "10000000000".getBytes();
byte[] content = "测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试测试".getBytes();
System.arraycopy(userId, 0, request, 0, 10);
System.arraycopy(content, 0, request, 10, content.length);
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
countDownLatch.countDown();
outputStream.write(request);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
countDownLatch.await();
for (int i = 0; i < 10; i++) {
outputStream.write(request);
}
Thread.sleep(2000L); // 两秒后退出
socket.close();
}
服务端用一个netty去接收数据
public static void main(String[] args) throws Exception {
// 1、 线程定义
// accept 处理连接的线程池
EventLoopGroup acceptGroup = new NioEventLoopGroup();
// read io 处理数据的线程池
EventLoopGroup readGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(acceptGroup, readGroup);
// 2、 选择TCP协议,NIO的实现方式
b.channel(NioServerSocketChannel.class);
b.handler(new LoggingHandler(LogLevel.INFO));
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 3、 职责链定义(请求收到后怎么处理)
ChannelPipeline pipeline = ch.pipeline();
// TODO 3.2 打印出内容 handdler
pipeline.addLast(new XHandller());
}
});
// 4、 绑定端口
System.out.println("启动成功,端口 9999");
Channel channel = b.bind(new InetSocketAddress(9999)).sync().channel();
System.out.println(channel.localAddress());
channel.closeFuture().sync();
} finally {
acceptGroup.shutdownGracefully();
readGroup.shutdownGracefully();
}
}
服务端收到的结果是 都凑到一堆去了
Netty中的解决办法
Netty解码器
Netty中主要提供了抽象基类ByteToMessageDecoder 和MessageToMessageDecoder 实现了ChannelInboundHandler 接口。
- ByteToMessageDecoder:用于将接收到的二进制数据(byte)解码,得到完整的请求报文 (Message)。 抽象的类型 例如 json 对象、object
实现:
- MessageToMessageDecoder:将一个本身就包含完整报文信息的对象转换成另一个Java对象。
Netty编码器
代码实现一个解码器
public class XDecoder extends ByteToMessageDecoder {
static final int PACKET_SIZE = 160;
// 用来临时保留没有处理过的请求报文
ByteBuf tempMsg = Unpooled.buffer();
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("收到了一次数据包,长度是:" + in.readableBytes());
// in 请求的数据
// out 将粘在一起的报文拆分后的结果保留起来
// 1、 合并报文
ByteBuf message = null;
int tmpMsgSize = tempMsg.readableBytes();
// 如果暂存有上一次余下的请求报文,则合并
if (tmpMsgSize > 0) {
message = Unpooled.buffer();
message.writeBytes(tempMsg);
message.writeBytes(in);
System.out.println("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes());
} else {
message = in;
}
// 2、 拆分报文
// 这个场景下,一个请求固定长度为3,可以根据长度来拆分
// i+1 i+1 i+1 i+1 i+1
// 不固定长度,需要应用层协议来约定 如何计算长度
// 在应用层中,根据单个报文的长度及特殊标记,来将报文进行拆分或合并
// dubbo rpc协议 = header(16) + body(不固定)
// header最后四个字节来标识body
// 长度 = 16 + body长度
// 0xda, 0xbb 魔数
int size = message.readableBytes();
int counter = size / PACKET_SIZE;
for (int i = 0; i < counter; i++) {
byte[] request = new byte[PACKET_SIZE];
// 每次从总的消息中读取3个字节的数据
message.readBytes(request);
// 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
out.add(Unpooled.copiedBuffer(request));
}
// 3、多余的报文存起来
// 第一个报文: i+ 暂存
// 第二个报文: 1 与第一次
size = message.readableBytes();
if (size != 0) {
System.out.println("多余的数据长度:" + size);
// 剩下来的数据放到tempMsg暂存
tempMsg.clear();
tempMsg.writeBytes(message.readBytes(size));
}
}
总结
整篇文章主要介绍的是Netty的启动类,及为了解决tcp的粘包和拆包,而出现的编解码器。对源码解析,在springboot中其实也是采用这种方式进行启动,将它进行整合;则需要看springboot start中的整合。
以上是关于Netty之启动类编解码器等源码解析及粘包拆包问题的主要内容,如果未能解决你的问题,请参考以下文章
Netty框架之编解码机制一(ByteBuf以及Tcp粘包拆包)
netty之粘包拆包ByteToMessageDecoder
netty之粘包拆包ByteToMessageDecoder
netty之粘包拆包ByteToMessageDecoder