攻读netty源码,解码器
Posted 猴子学java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了攻读netty源码,解码器相关的知识,希望对你有一定的参考价值。
前言
数据在网络上是使用二进制数据流进行传输的,应用程序需要读取网络数据,势必要经过解码阶段,因此解码是把二进制数据流解析成自定义协议的数据包,对应netty里面的ByteBuf。针对netty的解码器,或许会有如下疑问:
1、netty的解码器是怎么解码的?
2、netty有哪些拆箱即用的解码器?
针对这些疑问,我们将从如下几点从源码来进行分析:
1、ByteToMessageDecoder分析
2、四种常用的解码器
3、总结
一、数据读取过程分析
在分析解码器之前,先从一次数据读取的流程开始分析,了解数据是怎么调用到netty解码器的。
当客户端向服务端发送数据的之后,服务端的NioEventLoop会轮询出read事件。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
在NioByteUnsafe的read方法中,会触发fireChannelRead事件,当传播到ByteToMessageDecoder的时候,解码开始。
二、ByteToMessageDecoder分析
在ByteToMessageDecoder的channelRead方法中,整个解码过程可以分三个步骤:
累加字节流
调用子类解码器进行解码
将解析到ByteBuf往下传播
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
// 累加字节流
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 调用子类解码器解码
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
// 忽略无关代码
// 把解析到的ByteBuf往下传播
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
1、累加字节流
在累加字节流的时候,如果累加器的writerIndex加上字节流中可读字节数大于累加器的最大容量,那么累加器会扩容,接着把ByteBuf写到累加器中,最后回收ByteBuf。
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
try {
final ByteBuf buffer;
// cumulation.writerIndex() + in.readableBytes() > cumulation.maxCapacity()
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
// 累加器扩容
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
// 把读取到的ByteBuf写到累加器中
buffer.writeBytes(in);
return buffer;
} finally {
// 回收
in.release();
}
}
};
// 累加器扩容
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
ByteBuf oldCumulation = cumulation;
// 累加器扩容后的容量=累加器中的可读字节数+ByteBuf中的可读字节数
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
// 把老的累加器的数据写入到新的累加器中
cumulation.writeBytes(oldCumulation);
// 回收老的累加器
oldCumulation.release();
return cumulation;
}
2、调用子类解码器进行解码
调用callDecode方法的时候,首先循环累加器中的ByteBuf,如果已经读取到完整的ByteBuf,触发channelRead事件,首次读取的时候不会有这种情况,那么先标记累加器中可读的字节数,然后调用子类解码器解码,如果没有读取到一个完整的数据包,会继续读取,否则结束此次解码。
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
// 循环累加器中的ByteBuf
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {// 已经读取完整的ByteBuf
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
// 可读字节数
int oldInputLength = in.readableBytes();
// 调用子类的decode方法
decodeRemovalReentryProtection(ctx, in, out);
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
// 没有读取到任何数据
if (oldInputLength == in.readableBytes()) {
break;
} else {// 没有读取到一个完整的ByteBuf
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
// 只解码一次
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
3、将解析到ByteBuf往下传播
解析到的ByteBuf,放入一个List集合中,在finally代码块中,调用fireChannelRead方法,把ByteBuf往下传播。
static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
if (msgs instanceof CodecOutputList) {
// 把List中的ByteBuf往下传播
fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
} else {
for (int i = 0; i < numElements; i++) {
ctx.fireChannelRead(msgs.get(i));
}
}
}
ByteToMessageDecoder调用channelRead的时候,使用ThreadLocal的方式取到CodecOutputLists,然后从数组中取到一个CodecOutputList。默认情况下CodecOutputLists创建16个CodecOutputList,每个CodecOutputList的容量默认16,超过大小就会扩容一倍,它是一个可回收的、用来存放解码后的ByteBuf的List。
三、四种常用的解码器
tcp使用流的方式进行传输,所谓流,就是一串没有分界的数据。可以把它看成河中的水连成一片,没有分界。
在tcp缓冲区中,一个完整的数据包可能会被拆分成多个包进行发送,也能把多个小包合并成一个进行发送,这就是TCP的粘包和拆包问题。
然而TCP底层并不关系上层的业务数据,所以在底层无可避免的会出现重组和拆分,为了解决这个问题,需要设计上层的业务协议栈。
根据主流协议的解决方案,netty封装了四种常用的解码器:
基于固定长度的解码器FixedLengthFrameDecoder;
行解码器LineBasedFrameDecoder;
基于分隔符的解码器DelimiterBasedFrameDecoder;
基于长度域的解码器LengthFieldBasedFrameDecoder。
1、基于固定长度的解码器
FixedLengthFrameDecoder是netty基于固定长度的解码器。
如果我们收到如下一段数据包:
+---+----+------+----+
| A | BC | DEFG | HI |
+---+----+------+----+
然后以3个字节长度进行解码,那么我们将会收到如下数据包:
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
在decode方法中,如果ByteBuf中的可读字节数小于frameLength,那么不读取,否则截取一个固定长度的ByteBuf。
protected Object decode(
in) throws Exception { ChannelHandlerContext ctx, ByteBuf
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readRetainedSlice(frameLength);
}
}
2、基于换行符的解码器
LineBasedFrameDecoder解码的时候分为两种模式:非丢弃模式和丢弃模式,默认情况下是非丢弃模式。
b、非丢弃模式下,如果能够找到换行符,并且截取的数据包大于解码器定义的最大长度,那么丢弃这段数据,抛出TooLongFrameException异常,不然根据截取的数据包中是否需要换行符返回截取的ByteBuf;
c、非丢弃模式下,如果不能找到换行符,并且截取的数据包大于解码器定义的最大长度,除了丢弃这段数据,抛出TooLongFrameException异常,还会标志下次解码进入丢弃模式。
d、丢弃模式下,如果能够找到换行符,除了丢弃这段数据之外,还会标志下次解码进入非丢弃模式,如果failFast设置为false,表示异常延迟抛出,此时会把上次丢弃的字节数一起抛出异常给应用程序。
e、丢弃模式下,如果不能能够找到换行符,则会继续丢弃这段数据。
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
// 寻找换行符所在ByteBuf的地址
final int eol = findEndOfLine(buffer);
if (!discarding) { // discarding=false
if (eol >= 0) { // 有数据
final ByteBuf frame;
// 截取数据包的长度
final int length = eol - buffer.readerIndex();
// 分隔符的长度
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
// 超过解码器截取数据包的最大长度
if (length > maxLength) {
// 移动读指针,丢弃该段数据
buffer.readerIndex(eol + delimLength);
// 抛TooLongFrameException异常
fail(ctx, length);
return null;
}
// 截取的数据包忽略换行符
if (stripDelimiter) {// 默认为false
frame = buffer.readRetainedSlice(length);
buffer.skipBytes(delimLength);
} else {
// 把数据包和换行符一起截取出来
frame = buffer.readRetainedSlice(length + delimLength);
}
return frame;
} else { // 读取数据结尾 -1
// byteBuf中可读字节数
final int length = buffer.readableBytes();
if (length > maxLength) { // 超过最大长度
//记录丢弃的字节数
discardedBytes = length;
// 移动读指针到写指针的位置,说明丢弃该段数据
buffer.readerIndex(buffer.writerIndex());
// 标志进入丢弃模式
discarding = true;
offset = 0;
// 是否立马触发异常
if (failFast) {// 默认为true
fail(ctx, "over " + discardedBytes);
}
}
return null;
}
} else { // 进入丢弃魔术
if (eol >= 0) { // 读取到数据
// 总共丢弃的数据包长度
final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
// 移动读指针
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
discarding = false; // 标志进入非丢弃模式
if (!failFast) {
fail(ctx, length);
}
} else {
// 总共丢弃的数据包长度
discardedBytes += buffer.readableBytes();
// 移动读指针
buffer.readerIndex(buffer.writerIndex());
// We skip everything in the buffer, we need to set the offset to 0 again.
offset = 0;
}
return null;
}
}
我们继续跟进一下寻找换行符的代码findEndOfLine。
private int findEndOfLine(final ByteBuf buffer) {
int totalLength = buffer.readableBytes();
int i = buffer.forEachByte(buffer.readerIndex() + offset, totalLength - offset, ByteProcessor.FIND_LF);
if (i >= 0) {
offset = 0;
if (i > 0 && buffer.getByte(i - 1) == '\r') {
i--;
}
} else {
offset = totalLength;
}
return i;
}
通过调用ByteBuf的forEachByte方法寻找'\n'的位置,如果是"\r\n"的换行符,那么eol的指针会指向’\r‘。
3、基于分隔符的解码器
行解码器也是一种特殊的基于分隔符的解码器,如果分隔符是 "\n" 或者"\r\n",那么会直接调用LineBasedFrameDecoder的解码逻辑。
基于分隔符的解码器DelimiterBasedFrameDecoder可以基于多个分隔符进行解码,整个解码过程可以分为:
a、行解码器
b、找到最小分隔符
c、解码
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
// 分隔符为换行符,调用行解码器
if (lineBasedDecoder != null) {
return lineBasedDecoder.decode(ctx, buffer);
}
// Try all delimiters and choose the delimiter which yields the shortest frame.
int minFrameLength = Integer.MAX_VALUE;
ByteBuf minDelim = null;
// 找到离readIndex最近的一个分隔符
for (ByteBuf delim: delimiters) {
int frameLength = indexOf(buffer, delim);
if (frameLength >= 0 && frameLength < minFrameLength) {
minFrameLength = frameLength;
minDelim = delim;
}
}
if (minDelim != null) { // 找到最小分隔符
int minDelimLength = minDelim.capacity();
ByteBuf frame;
// 是否为丢弃模式,同行解码器的逻辑
if (discardingTooLongFrame) {// 丢弃模式
// We've just finished discarding a very large frame.
// Go back to the initial state.
discardingTooLongFrame = false;
buffer.skipBytes(minFrameLength + minDelimLength);
int tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
if (!failFast) {
fail(tooLongFrameLength);
}
return null;
}
//
if (minFrameLength > maxFrameLength) {
// Discard read frame.
buffer.skipBytes(minFrameLength + minDelimLength);
fail(minFrameLength);
return null;
}
if (stripDelimiter) {
frame = buffer.readRetainedSlice(minFrameLength);
buffer.skipBytes(minDelimLength);
} else {
frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
}
return frame;
} else {
// 没有找到最小分隔符
if (!discardingTooLongFrame) { // 非丢弃模式
// ByteBuf的可读字节数大于规定的数据包大小
if (buffer.readableBytes() > maxFrameLength) {
// 记录丢弃的字节数
tooLongFrameLength = buffer.readableBytes();
// 跳过这段数据包
buffer.skipBytes(buffer.readableBytes());
discardingTooLongFrame = true;// 标志进入丢弃模式
if (failFast) {
// 抛出异常
fail(tooLongFrameLength);
}
}
} else {// 丢弃模式
// Still discarding the buffer since a delimiter is not found.
tooLongFrameLength += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
}
根据分隔符列表判断是否为行分隔符,分隔符需要包含:"\n" 和"\r\n"。
private static boolean isLineBased(final ByteBuf[] delimiters) {
if (delimiters.length != 2) {
return false;
}
ByteBuf a = delimiters[0];
ByteBuf b = delimiters[1];
if (a.capacity() < b.capacity()) {
a = delimiters[1];
b = delimiters[0];
}
return a.capacity() == 2 && b.capacity() == 1
&& a.getByte(0) == '\r' && a.getByte(1) == '\n'
&& b.getByte(0) == '\n';
}
基于分隔符的解码器的解码过程和行行解码器的类似,可以根据注释去分析。
4、基于长度域的解码器
基于长度域的解码器LengthFieldBasedFrameDecoder,可以根据接收到的ByteBuf中的长度字段的值进行动态分割,这个长度字段可以表示消息体的长度或者整个消息的长度。
LengthFieldBasedFrameDecoder有很多参数配置,可以解码任何带有长度字段的消息,特别适合客户端-服务端的TCP通信协议。下面通过4个例子来分析LengthFieldBasedFrameDecoder参数的作用。
lengthFieldOffset:在二进制数据流中长度字段的偏移位置;
lengthFieldLength:长度字段的长度(占用字节数);
lengthAdjustment 修改长度字段中定义的值,可以为负数 ;
initialBytesToStrip 解析的时候需要跳过的字节数 。
在如下的示例中,长度的值0x000C=12表示"HELLO, WORLD"的长度,因为长度字段占用2个字节,因此解码之后的数据包有14个字节。
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 0 (= do not strip header)
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
在如下的示例中,initialBytesToStrip=2,表示解码之后跳过2个字节,因为长度字段占用2个字节,因此解码之后的数据包有12个字节。
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 2 (= the length of the Length field)
BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
+----------------+
Length | Actual Content |----->| Actual Content |
0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
+----------------+
在如下的示例中,lengthAdjustment=-2,表示长度字段中定义的消息长度还要减去2才是需要截取的数据包长度。因为0x000E=14,表示往后截取14个字节,但是lengthAdjustment=-2,所以往后截取的字节数是12,最终解码得到14字节。
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = -2 (= the length of the Length field)
initialBytesToStrip = 0
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
在如下的示例中,lengthFieldOffset=2,表示长度字段在二进制数据流中往后偏移2个字节长度,找到0x00000C=12,因此解码后的字节长度等于17=12+3+2。
lengthFieldOffset = 2 (= the length of Header 1)
lengthFieldLength = 3
lengthAdjustment = 0
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
| 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
通过上面的四个例子,已经知道LengthFieldBasedFrameDecoder的主要参数在解码过程中起到的作用,下面通过源码来分析解码的过程,真正了解LengthFieldBasedFrameDecoder参数起到的作用。
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (discardingTooLongFrame) {// 默认为false
discardingTooLongFrame(in);// 丢弃模式下的处理
}
// lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength
// 可读字节数是否小于长度偏移地址
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
// 计算长度字段在ByteBuf中的真实地址
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
// 取到计算长度中定义的消息长度值
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength < 0) {// 如果是空包,跳过长度字段所占的字节数
failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
}
// 计算需要抽取的数据包长度
frameLength += lengthAdjustment + lengthFieldEndOffset;
if (frameLength < lengthFieldEndOffset) {// 数据包长度还没有长度字段所占的长度大,跳过字节,抛出异常
failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
}
if (frameLength > maxFrameLength) {// 数据包超过规定的最大值
// 如果ByteBuf的可读字节数大于截取数据包的长度,那么直接丢弃这段数据
// 否则进入丢弃模式,但是如果已经读取完ByteBuf,那么又会重新进入非丢弃模式
exceededFrameLength(in, frameLength);
return null;
}
// never overflows because it's less than maxFrameLength
int frameLengthInt = (int) frameLength;
if (in.readableBytes() < frameLengthInt) {
return null;
}
// 判断跳过的字节数超过了数据包的长度
if (initialBytesToStrip > frameLengthInt) {
failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
}
// 跳过字节
in.skipBytes(initialBytesToStrip);
//抽取数据包
int readerIndex = in.readerIndex();
int actualFrameLength = frameLengthInt - initialBytesToStrip;
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
// 移动读指针
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
从上面的源码可以看出,整个解码过程可以分为三步:
计算需要抽取的数据包长度;
跳过字节逻辑处理;
丢弃模式下的处理。
三、总结
回到文中开头的问题,通过分析解码器的源码,可以很轻松的解惑。
netty解码器解码的时候首先累加字节流,然后调用子类的解码器进行解码,最后将解码出来额ByteBuf往下传播。
netty为了解决tcp粘包拆包的问题,根据主流的设计方案,有四种拆箱即用的解码器:基于固定长度的解码器,行解码器、基于分隔符的解码器和基于长度域的解码器。
欢迎关注个人博客:https://my.oschina.net/chenfanglin
以上是关于攻读netty源码,解码器的主要内容,如果未能解决你的问题,请参考以下文章
Netty-源码分析DelimiterBasedFrameDecoder
Netty编解码开发+多协议开发和应用+源码+高级特性笔记免费送