攻读netty源码,解码器

Posted 猴子学java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了攻读netty源码,解码器相关的知识,希望对你有一定的参考价值。

前言

数据在网络上是使用二进制数据流进行传输的,应用程序需要读取网络数据,势必要经过解码阶段,因此解码是把二进制数据流解析成自定义协议的数据包,对应netty里面的ByteBuf。针对netty的解码器,或许会有如下疑问:

1、netty的解码器是怎么解码的?

2、netty有哪些拆箱即用的解码器?

针对这些疑问,我们将从如下几点从源码来进行分析:

1、ByteToMessageDecoder分析

2、四种常用的解码器

3、总结


一、数据读取过程分析

在分析解码器之前,先从一次数据读取的流程开始分析,了解数据是怎么调用到netty解码器的。

当客户端向服务端发送数据的之后,服务端的NioEventLoop会轮询出read事件。

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}

在NioByteUnsafe的read方法中,会触发fireChannelRead事件,当传播到ByteToMessageDecoder的时候,解码开始。


二、ByteToMessageDecoder分析

ByteToMessageDecoder的channelRead方法中,整个解码过程可以分三个步骤:

  • 累加字节流

  • 调用子类解码器进行解码

  • 将解析到ByteBuf往下传播

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else {             // 累加字节流 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); }            // 调用子类解码器解码 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally {            // 忽略无关代码                        // 把解析到的ByteBuf往下传播 fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); }}


1、累加字节流

在累加字节流的时候,如果累加器的writerIndex加上字节流中可读字节数大于累加器的最大容量,那么累加器会扩容,接着把ByteBuf写到累加器中,最后回收ByteBuf。

public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { try { final ByteBuf buffer;            // cumulation.writerIndex() + in.readableBytes() > cumulation.maxCapacity() if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {                // 累加器扩容 buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; }            // 把读取到的ByteBuf写到累加器中 buffer.writeBytes(in); return buffer; } finally { // 回收 in.release(); } }};// 累加器扩容static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) { ByteBuf oldCumulation = cumulation;    // 累加器扩容后的容量=累加器中的可读字节数+ByteBuf中的可读字节数 cumulation = alloc.buffer(oldCumulation.readableBytes() + readable); // 把老的累加器的数据写入到新的累加器中 cumulation.writeBytes(oldCumulation); // 回收老的累加器 oldCumulation.release(); return cumulation;}


2、调用子类解码器进行解码

调用callDecode方法的时候,首先循环累加器中的ByteBuf,如果已经读取到完整的ByteBuf,触发channelRead事件,首次读取的时候不会有这种情况,那么先标记累加器中可读的字节数,然后调用子类解码器解码,如果没有读取到一个完整的数据包,会继续读取,否则结束此次解码。

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { // 循环累加器中的ByteBuf while (in.isReadable()) { int outSize = out.size(); if (outSize > 0) {// 已经读取完整的ByteBuf fireChannelRead(ctx, out, outSize); out.clear(); if (ctx.isRemoved()) { break; } outSize = 0; } // 可读字节数 int oldInputLength = in.readableBytes(); // 调用子类的decode方法            decodeRemovalReentryProtection(ctx, inout); if (ctx.isRemoved()) { break; }  if (outSize == out.size()) { // 没有读取到任何数据 if (oldInputLength == in.readableBytes()) { break; } else {// 没有读取到一个完整的ByteBuf continue; } }  if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } // 只解码一次 if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Exception cause) { throw new DecoderException(cause); }}


3、将解析到ByteBuf往下传播

解析到的ByteBuf,放入一个List集合中,在finally代码块中,调用fireChannelRead方法,把ByteBuf往下传播。

static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) { if (msgs instanceof CodecOutputList) {        // 把List中的ByteBuf往下传播 fireChannelRead(ctx, (CodecOutputList) msgs, numElements); } else { for (int i = 0; i < numElements; i++) { ctx.fireChannelRead(msgs.get(i)); } }}

ByteToMessageDecoder调用channelRead的时候,使用ThreadLocal的方式取到CodecOutputLists,然后从数组中取到一个CodecOutputList。默认情况下CodecOutputLists创建16个CodecOutputList,每个CodecOutputList的容量默认16,超过大小就会扩容一倍,它是一个可回收的、用来存放解码后的ByteBuf的List。


三、四种常用的解码器

tcp使用流的方式进行传输,所谓流,就是一串没有分界的数据。可以把它看成河中的水连成一片,没有分界

在tcp缓冲区中,一个完整的数据包可能会被拆分成多个包进行发送,也能把多个小包合并成一个进行发送,这就是TCP的粘包和拆包问题。

然而TCP底层并不关系上层的业务数据,所以在底层无可避免的会出现重组和拆分,为了解决这个问题,需要设计上层的业务协议栈。

根据主流协议的解决方案,netty封装了四种常用的解码器:

  • 基于固定长度的解码器FixedLengthFrameDecoder;

  • 行解码器LineBasedFrameDecoder;

  • 基于分隔符的解码器DelimiterBasedFrameDecoder;

  • 基于长度域的解码器LengthFieldBasedFrameDecoder。


1、基于固定长度的解码器

FixedLengthFrameDecoder是netty基于固定长度的解码器。

如果我们收到如下一段数据包:

 +---+----+------+----+
| A | BC | DEFG | HI |
+---+----+------+----+

然后以3个字节长度进行解码,那么我们将会收到如下数据包:

 +-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+


在decode方法中,如果ByteBuf中的可读字节数小于frameLength,那么不读取,否则截取一个固定长度的ByteBuf。

protected Object decode( @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (in.readableBytes() < frameLength) { return null; } else { return in.readRetainedSlice(frameLength); }}


2、基于换行符的解码器

LineBasedFrameDecoder解码的时候分为两种模式:非丢弃模式和丢弃模式,默认情况下是非丢弃模式。

b、非丢弃模式下,如果能够找到换行符,并且截取的数据包大于解码器定义的最大长度,那么丢弃这段数据,抛出TooLongFrameException异常,不然根据截取的数据包中是否需要换行符返回截取的ByteBuf;

c、非丢弃模式下,如果不能找到换行符,并且截取的数据包大于解码器定义的最大长度,除了丢弃这段数据,抛出TooLongFrameException异常,还会标志下次解码进入丢弃模式。

d、丢弃模式下,如果能够找到换行符,除了丢弃这段数据之外,还会标志下次解码进入非丢弃模式,如果failFast设置为false,表示异常延迟抛出,此时会把上次丢弃的字节数一起抛出异常给应用程序。

e、丢弃模式下,如果不能能够找到换行符,则会继续丢弃这段数据。

protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {    // 寻找换行符所在ByteBuf的地址 final int eol = findEndOfLine(buffer);    if (!discarding) { // discarding=false if (eol >= 0) { // 有数据 final ByteBuf frame;            // 截取数据包的长度 final int length = eol - buffer.readerIndex();            // 分隔符的长度 final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;            // 超过解码器截取数据包的最大长度 if (length > maxLength) {                // 移动读指针,丢弃该段数据 buffer.readerIndex(eol + delimLength); // 抛TooLongFrameException异常 fail(ctx, length); return null; } // 截取的数据包忽略换行符            if (stripDelimiter) {// 默认为false frame = buffer.readRetainedSlice(length); buffer.skipBytes(delimLength); } else {                // 把数据包和换行符一起截取出来 frame = buffer.readRetainedSlice(length + delimLength);            } return frame; } else { // 读取数据结尾 -1 // byteBuf中可读字节数 final int length = buffer.readableBytes();            if (length > maxLength) { // 超过最大长度                //记录丢弃的字节数 discardedBytes = length;                // 移动读指针到写指针的位置,说明丢弃该段数据 buffer.readerIndex(buffer.writerIndex()); // 标志进入丢弃模式 discarding = true; offset = 0;                // 是否立马触发异常 if (failFast) {// 默认为true fail(ctx, "over " + discardedBytes); } } return null; } } else { // 进入丢弃魔术 if (eol >= 0) { // 读取到数据 // 总共丢弃的数据包长度 final int length = discardedBytes + eol - buffer.readerIndex(); final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1; // 移动读指针 buffer.readerIndex(eol + delimLength); discardedBytes = 0;            discarding = false// 标志进入非丢弃模式 if (!failFast) { fail(ctx, length); } } else { // 总共丢弃的数据包长度 discardedBytes += buffer.readableBytes(); // 移动读指针 buffer.readerIndex(buffer.writerIndex()); // We skip everything in the buffer, we need to set the offset to 0 again. offset = 0; } return null; }}

我们继续跟进一下寻找换行符的代码findEndOfLine。

private int findEndOfLine(final ByteBuf buffer) { int totalLength = buffer.readableBytes(); int i = buffer.forEachByte(buffer.readerIndex() + offset, totalLength - offset, ByteProcessor.FIND_LF); if (i >= 0) { offset = 0; if (i > 0 && buffer.getByte(i - 1) == '\r') { i--; } } else { offset = totalLength; } return i;}

通过调用ByteBuf的forEachByte方法寻找'\n'的位置,如果是"\r\n"的换行符,那么eol的指针会指向’\r‘。


3、基于分隔符的解码器

行解码器也是一种特殊的基于分隔符的解码器,如果分隔符是 "\n" 或者"\r\n",那么会直接调用LineBasedFrameDecoder的解码逻辑。

基于分隔符的解码器DelimiterBasedFrameDecoder可以基于多个分隔符进行解码,整个解码过程可以分为:

a、行解码器

b、找到最小分隔符

c、解码

protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { // 分隔符为换行符,调用行解码器 if (lineBasedDecoder != null) { return lineBasedDecoder.decode(ctx, buffer); } // Try all delimiters and choose the delimiter which yields the shortest frame. int minFrameLength = Integer.MAX_VALUE; ByteBuf minDelim = null; // 找到离readIndex最近的一个分隔符 for (ByteBuf delim: delimiters) { int frameLength = indexOf(buffer, delim); if (frameLength >= 0 && frameLength < minFrameLength) { minFrameLength = frameLength; minDelim = delim; }    }   if (minDelim != null) { // 找到最小分隔符 int minDelimLength = minDelim.capacity(); ByteBuf frame;        // 是否为丢弃模式,同行解码器的逻辑 if (discardingTooLongFrame) {// 丢弃模式 // We've just finished discarding a very large frame. // Go back to the initial state. discardingTooLongFrame = false;            buffer.skipBytes(minFrameLength + minDelimLength); int tooLongFrameLength = this.tooLongFrameLength; this.tooLongFrameLength = 0; if (!failFast) { fail(tooLongFrameLength); } return null; } //  if (minFrameLength > maxFrameLength) { // Discard read frame. buffer.skipBytes(minFrameLength + minDelimLength); fail(minFrameLength); return null;        } if (stripDelimiter) { frame = buffer.readRetainedSlice(minFrameLength); buffer.skipBytes(minDelimLength); } else { frame = buffer.readRetainedSlice(minFrameLength + minDelimLength); }
return frame; } else { // 没有找到最小分隔符        if (!discardingTooLongFrame) {     // 非丢弃模式             // ByteBuf的可读字节数大于规定的数据包大小 if (buffer.readableBytes() > maxFrameLength) { // 记录丢弃的字节数 tooLongFrameLength = buffer.readableBytes();                // 跳过这段数据包 buffer.skipBytes(buffer.readableBytes()); discardingTooLongFrame = true;// 标志进入丢弃模式 if (failFast) { // 抛出异常 fail(tooLongFrameLength); } } } else {// 丢弃模式 // Still discarding the buffer since a delimiter is not found. tooLongFrameLength += buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); } return null; }}

根据分隔符列表判断是否为行分隔符,分隔符需要包含:"\n" 和"\r\n"。

private static boolean isLineBased(final ByteBuf[] delimiters) { if (delimiters.length != 2) { return false; } ByteBuf a = delimiters[0]; ByteBuf b = delimiters[1]; if (a.capacity() < b.capacity()) { a = delimiters[1]; b = delimiters[0]; } return a.capacity() == 2 && b.capacity() == 1 && a.getByte(0) == '\r' && a.getByte(1) == '\n' && b.getByte(0) == '\n';}

基于分隔符的解码器的解码过程和行行解码器的类似,可以根据注释去分析。


4、基于长度域的解码器

基于长度域的解码器LengthFieldBasedFrameDecoder,可以根据接收到的ByteBuf中的长度字段的值进行动态分割,这个长度字段可以表示消息体的长度或者整个消息的长度。

LengthFieldBasedFrameDecoder有很多参数配置,可以解码任何带有长度字段的消息,特别适合客户端-服务端的TCP通信协议。下面通过4个例子来分析LengthFieldBasedFrameDecoder参数的作用。

lengthFieldOffset:在二进制数据流中长度字段的偏移位置;

lengthFieldLength:长度字段的长度(占用字节数);

    lengthAdjustment 修改长度字段中定义的值,可以为负数 ;

    initialBytesToStrip 解析的时候需要跳过的字节数 。

在如下的示例中,长度的值0x000C=12表示"HELLO, WORLD"的长度,因为长度字段占用2个字节,因此解码之后的数据包有14个字节。

 lengthFieldOffset   = 0 lengthFieldLength = 2 lengthAdjustment = 0 initialBytesToStrip = 0 (= do not strip header)
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes) +--------+----------------+ +--------+----------------+ | Length | Actual Content |----->| Length | Actual Content | | 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" | +--------+----------------+ +--------+----------------+

在如下的示例中,initialBytesToStrip=2,表示解码之后跳过2个字节,因为长度字段占用2个字节,因此解码之后的数据包有12个字节。

 lengthFieldOffset = 0 lengthFieldLength = 2 lengthAdjustment = 0 initialBytesToStrip = 2 (= the length of the Length field)
BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes) +--------+----------------+ +----------------+ | Length | Actual Content |----->| Actual Content | | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" | +--------+----------------+ +----------------+

在如下的示例中,lengthAdjustment=-2,表示长度字段中定义的消息长度还要减去2才是需要截取的数据包长度。因为0x000E=14,表示往后截取14个字节,但是lengthAdjustment=-2,所以往后截取的字节数是12,最终解码得到14字节。

lengthFieldOffset = 0lengthFieldLength = 2lengthAdjustment = -2 (= the length of the Length field)initialBytesToStrip = 0
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)+--------+----------------+ +--------+----------------+| Length | Actual Content |----->| Length | Actual Content || 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |+--------+----------------+ +--------+----------------+

在如下的示例中,lengthFieldOffset=2,表示长度字段在二进制数据流中往后偏移2个字节长度,找到0x00000C=12,因此解码后的字节长度等于17=12+3+2。

lengthFieldOffset = 2 (= the length of Header 1)lengthFieldLength = 3lengthAdjustment = 0initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)+----------+----------+----------------+ +----------+----------+----------------+| Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content || 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |+----------+----------+----------------+ +----------+----------+----------------+

通过上面的四个例子,已经知道LengthFieldBasedFrameDecoder的主要参数在解码过程中起到的作用,下面通过源码来分析解码的过程,真正了解LengthFieldBasedFrameDecoder参数起到的作用。

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {    if (discardingTooLongFrame) {// 默认为false        discardingTooLongFrame(in);// 丢弃模式下的处理 }    // lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength    // 可读字节数是否小于长度偏移地址 if (in.readableBytes() < lengthFieldEndOffset) { return null; }    // 计算长度字段在ByteBuf中的真实地址 int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;    // 取到计算长度中定义的消息长度值 long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
    if (frameLength < 0) {// 如果是空包,跳过长度字段所占的字节数 failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset); } // 计算需要抽取的数据包长度    frameLength += lengthAdjustment + lengthFieldEndOffset;    if (frameLength < lengthFieldEndOffset) {// 数据包长度还没有长度字段所占的长度大,跳过字节,抛出异常 failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);    }    if (frameLength > maxFrameLength) {// 数据包超过规定的最大值        // 如果ByteBuf的可读字节数大于截取数据包的长度,那么直接丢弃这段数据        // 否则进入丢弃模式,但是如果已经读取完ByteBuf,那么又会重新进入非丢弃模式 exceededFrameLength(in, frameLength); return null;    } // never overflows because it's less than maxFrameLength int frameLengthInt = (int) frameLength; if (in.readableBytes() < frameLengthInt) { return null; }    // 判断跳过的字节数超过了数据包的长度 if (initialBytesToStrip > frameLengthInt) { failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip); }    // 跳过字节    in.skipBytes(initialBytesToStrip);    //抽取数据包 int readerIndex = in.readerIndex(); int actualFrameLength = frameLengthInt - initialBytesToStrip; ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);    // 移动读指针 in.readerIndex(readerIndex + actualFrameLength); return frame;}

从上面的源码可以看出,整个解码过程可以分为三步:

  • 计算需要抽取的数据包长度;

  • 跳过字节逻辑处理;

  • 丢弃模式下的处理。


三、总结

回到文中开头的问题,通过分析解码器的源码,可以很轻松的解惑。

netty解码器解码的时候首先累加字节流,然后调用子类的解码器进行解码,最后将解码出来额ByteBuf往下传播。

netty为了解决tcp粘包拆包的问题,根据主流的设计方案,有四种拆箱即用的解码器:基于固定长度的解码器,行解码器、基于分隔符的解码器和基于长度域的解码器。

欢迎关注个人博客:https://my.oschina.net/chenfanglin


以上是关于攻读netty源码,解码器的主要内容,如果未能解决你的问题,请参考以下文章

Netty之启动类编解码器等源码解析及粘包拆包问题

Netty源码剖析四:解码和编码

Netty-源码分析DelimiterBasedFrameDecoder

Netty编解码开发+多协议开发和应用+源码+高级特性笔记免费送

Netty编解码开发+多协议开发和应用+源码+高级特性笔记免费送

Netty4.xNetty源码分析之LineBasedFrameDecoder