Netty框架之编解码机制二(自定义协议)
Posted 木兮君
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty框架之编解码机制二(自定义协议)相关的知识,希望对你有一定的参考价值。
前言
自上篇文章Netty框架之编解码机制一(ByteBuf以及Tcp粘包拆包),小编将继续讲解netty中的编解码,以及tcp拆包粘包的解决方案代码实践,希望对大家理解有所帮助。好了话不多说进入正题。
拆包粘包的解决方案代码实践
上篇文章分享了一系列解决粘包拆包的方案,下面用代码来编写一些。
固定长度
换行
自定义分割符号
public class PacketSplicingTest {
private ServerBootstrap serverBootstrap;
@Before
public void initSocketServer() {
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup(5));
serverBootstrap.channel(NioserverSocketChannel.class);
}
@Test
public void splicingTest() throws InterruptedException {
serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
//使用固定长度 如果没有到4的长度则下面不会打印
//ch.pipeline().addLast(new FixedLengthFrameDecoder(4));
//换行分割,并且一行最大的长度为10
//ch.pipeline().addLast(new LineBasedFrameDecoder(10));
//特殊符号分割,下面示例为以#分割,最大读长度为10的字符串,中间true为自动会将#丢弃(即丢弃分隔符)
ByteBuf byteBuf = Unpooled.wrappedBuffer(new byte[]{'#'});
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(10,true,byteBuf));
ch.pipeline().addLast(new TrackHandler());
}
});
ChannelFuture sync = serverBootstrap.bind(8080).sync();
sync.channel().closeFuture().sync();
}
private class TrackHandler extends SimpleChannelInboundHandler {
int count = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
String message = byteBuf.toString(Charset.defaultCharset());
System.out.println(String.format("message%s:%s", ++count, message));
}
}
}
看完上述比较简单的应用后,小编带大家编写一个自定义协议简单示例。
自定义协议
首先咱们来自定义一个协议的报文,以及编解码实现流程
是不是非常简单,那么现在小编来开始愉快的编写源码。
首先是编解码器
public class ProtocolCodes extends ByteToMessageCodec<String> {
//标识符
private static int MAGIC = 0xDDDD;
//标识符 的bytebuf
private static ByteBuf MAGIC_BUF = Unpooled.copyInt(MAGIC);
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String msg, ByteBuf byteBuf) throws Exception {
byte[] bytes = msg.getBytes();
byteBuf.writeInt(MAGIC);
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
//是否有标识符
int indexOfMagic = indexOf(byteBuf, MAGIC_BUF);
if (indexOfMagic < 0) {
return; //需要更多的字节
}
//消息头的长度
if (!byteBuf.isReadable(indexOfMagic + 8)) {
return;//需要更多的字节
}
//读取消息头中消息长度
int length = byteBuf.slice(indexOfMagic + 4, 4).readInt();
//是否可读完整的消息体
if (!byteBuf.isReadable(indexOfMagic + 8 + length)) {
return;//需要更多的字节
}
//跳过消息头
byteBuf.skipBytes(indexOfMagic + 8);
//读取消息体
ByteBuf buf = byteBuf.readRetainedSlice(length);
String message = buf.toString(Charset.defaultCharset());
list.add(message);
}
//netty 内部工具类
private static int indexOf(ByteBuf haystack, ByteBuf needle) {
for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i++) {
int haystackIndex = i;
int needleIndex;
for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex++) {
if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {
break;
} else {
haystackIndex++;
if (haystackIndex == haystack.writerIndex() &&
needleIndex != needle.capacity() - 1) {
return -1;
}
}
}
if (needleIndex == needle.capacity()) {
// Found the needle from the haystack!
return i - haystack.readerIndex();
}
}
return -1;
}
}
客户端
public class ProtocolClient {
private Channel channel;
private void start() throws InterruptedException {
// ServerBootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup(1));
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new ProtocolCodes());//出站处理
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
channel = future.sync().channel();
}
public static void main(String[] args) throws Exception {
ProtocolClient client = new ProtocolClient();
client.start();
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String line = reader.readLine();
client.channel.writeAndFlush(line);
}
}
}
服务端
public class ProtocolServer {
private ServerBootstrap bootstrap;
@Before
public void init() {
bootstrap = new ServerBootstrap();
bootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup(8));
bootstrap.channel(NioServerSocketChannel.class);
}
@After
public void start() throws InterruptedException {
ChannelFuture future = bootstrap.bind(8080);
System.out.println("启动成功");
future.sync().channel().closeFuture().sync();
}
@Test
public void test() {
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new ProtocolCodes());
ch.pipeline().addLast(new TrackHandler());
}
}) ;
}
private static class TrackHandler extends SimpleChannelInboundHandler<String> {
int i = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(String.format("消息%s:%s", i++, msg));
ctx.writeAndFlush("返回消息");
}
}
}
测试结果:
成功编解码,这下发长度不一致的都可以了。
总结
今天主要是使用了netty编写了自定义协议是如何编解码的,当然小编自定义了一个简单的自定义协议,没有像http或dubbo等协议那么复杂,不过即使这样,大家其实也应该明白咱们的协议底层是如何编解码了吧。
看到这儿大家会不会觉得,网络编程的小伙伴确实会比写业务编程的小伙伴拿的工资高,这是有原因的。愿小编再接再厉,将底层的网络传输原理以及一些常用的协议研究一番,知道其架构的思想并掌握。期待小编后续的netty应用吧。
以上是关于Netty框架之编解码机制二(自定义协议)的主要内容,如果未能解决你的问题,请参考以下文章
Netty框架之编解码机制一(ByteBuf以及Tcp粘包拆包)