Netty 消息接收类故障案例分析
Posted InfoQ
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty 消息接收类故障案例分析相关的知识,希望对你有一定的参考价值。
尽管 Netty 应用广泛,非常成熟,但是由于对 Netty 底层机制不太了解,用户在实际使用中还是会经常遇到各种问题,大部分问题都是业务使用不当导致的。Netty 使用者需要学习 Netty 的故障定位技巧,以便出了问题能够独立、快速的解决。
在各种故障中,Netty 服务端接收不到客户端消息是一种比较常见的异常,大部分场景下都是用户使用不当导致的,下面我们对常见的消息接收接类故障进行分析和总结。
如果业务的 ChannelHandler 接收不到消息,可能的原因如下:
业务的解码 ChannelHandler 存在 BUG,导致消息解码失败,没有投递到后端。
业务发送的是畸形或者错误码流(例如长度错误),导致业务解码 ChannelHandler 无法正确解码出业务消息。
业务 ChannelHandler 执行了一些耗时或者阻塞操作,导致 Netty 的 NioEventLoop 被挂住,无法读取消息。
执行业务 ChannelHandler 的线程池队列积压,导致新接收的消息在排队,没有得到及时处理。
对方确实没有发送消息。
定位策略如下:
在业务的首个 ChannelHandler 的 channelRead 方法中打断点调试,看是否读取到消息。
在 ChannelHandler 中添加 LoggingHandler,打印接口日志。
查看 NioEventLoop 线程状态,看是否发生了阻塞。
通过 tcpdump 抓包看消息是否发送成功。
车联网服务端使用 Netty 构建,接收车载终端的请求消息,然后下发给后端其它系统,最后返回应答给车载终端。系统运行一段时间后发现服务端接收不到车载终端消息,导致业务中断,需要尽快定位出问题原因。
服务端运行一段时间之后,发现无法接收到车载终端的消息,相关日志示例如下:
图 1 车联网服务端无法接收消息日志
从日志看,服务端每隔一段时间(示例中是 15 秒,实际业务时间是随机的)就会接收不到消息,隔一段时间之后恢复,然后又没消息,周而复始。跟车载终端确认,终端设备每隔固定周期就会发送消息给服务端(日志分析),因此排除是终端没发消息导致的问题。怀疑是不是服务端负载过重,抢占不到 CPU 资源导致的周期性阻塞,采集 CPU 使用率,发现 CPU 资源不是瓶颈,排除 CPU 占用率高问题。
排除 CPU 之后,怀疑是不是内存有问题,导致频繁 GC 引起业务线程暂停。采集 GC 统计数据,示例如下:
图 2 GC 数据采集
通过 CPU 和内存资源占用监控分析,发现硬件资源不是瓶颈,问题应该出在服务端代码侧。
从现象上看,服务端接收不到消息,排除 GC、网络等问题之后,很有可能是 Netty 的 NioEventLoop 线程阻塞,导致 TCP 缓冲区的数据没有及时读取,故障期间采集服务端的线程堆栈进行分析,示例如下:
图 3 故障期间服务端线程堆栈
从线程堆栈分析,Netty 的 NioEventLoop 读取到消息后,调用业务线程池执行业务逻辑时,发生了 RejectedExecutionException 异常,由于后续业务逻辑由 NioEventLoop 线程执行,因此可以判断业务使用了 CallerRunsPolicy 策略,即当业务线程池消息队列满之后,由调用方线程来执行当前的 Runnable。NioEventLoop 在执行业务任务时发生了阻塞,导致 NioEventLoop 线程无法处理网络读写消息,因此会看到服务端没有消息接入,当从阻塞状态恢复之后,就可以继续接收消息。
如果后端业务逻辑处理慢,则会导致业务线程池阻塞队列积压,当积压达到上限容量之后,JDK 会抛出 RejectedExecutionException 异常,由于业务设置了 CallerRunsPolicy 策略,就会由调用方线程 NioEventLoop 执行业务逻辑,最终导致 NioEventLoop 线程被阻塞,无法读取请求消息。
除了 JDK 线程池异常处理策略使用不当之外,有些业务喜欢自己写阻塞队列,当队列满之后,向队列加入新的消息会阻塞当前线程,直到消息能够加入到队列中。案例中的车联网服务端真实业务代码就是此类问题:当转发给下游系统发生某些故障时,会导致业务定义的阻塞队列无法弹出消息进行处理,当队列积压满之后,就会阻塞 Netty 的 NIO 线程,而且无法自动恢复。
由于 ChannelHandler 是业务代码和 Netty 框架交汇的地方,ChannelHandler 里面的业务逻辑通常由 NioEventLoop 线程执行,因此防止业务代码阻塞 NioEventLoop 线程就显得非常重要,常见的阻塞情况有两类:
直接在 ChannelHandler 写可能导致程序阻塞的代码,包括但不限于数据库操作、第三方服务调用、中间件服务调用、同步获取锁、Sleep 等。
切换到业务线程池或者业务消息队列做异步处理时发生了阻塞,最典型的如阻塞队列、同步获取锁等。
在实际项目中,推荐业务处理线程和 Netty 网络 I/O 线程分离策略,原因如下:
充分利用多核的并行处理能力:I/O 线程和业务线程分离,双方可以并行的处理网络 I/O 和业务逻辑,充分利用多核的并行计算能力,提升性能。
故障隔离:后端的业务线程池处理各种类型的业务消息,有些是 I/O 密集型、有些是 CPU 密集型、有些是纯内存计算型,不同的业务处理时延,以及发生故障的概率都是不同的。如果把业务线程和 I/O 线程合并,就会存在如下问题:
某类业务处理较慢,阻塞 I/O 线程,导致其它处理较快的业务消息的响应无法及时发送出去。
即便是同类业务,如果使用同一个 I/O 线程同时处理业务逻辑和 I/O 读写,如果请求消息的业务逻辑处理较慢,同样会导致响应消息无法及时发送出去。
可维护性:I/O 线程和业务线程分离之后,双方职责单一,有利于代码维护和问题定位。如果合并在一起执行,当 RPC 调用时延增大之后,到底是网络问题、还是 I/O 线程问题、还是业务逻辑问题导致的时延大,纠缠在一起,问题定位难度非常大。例如业务线程中访问缓存或者数据库偶尔时延增大,就会导致 I/O 线程被阻塞,时延出现毛刺,这些时延毛刺的定位,难度非常大。
Netty I/O 线程和业务逻辑处理线程分离之后,线程模型如下所示:
图 4 Netty 业务线程和网络 I/O 线程分离
生产环境的 MQTT 服务运行一段时间之后,发现有新的端侧设备无法接入,连接超时。分析 MQTT 服务端日志,没有明显的异常,但是内存占用较高,查看连接数,发现有数 10 万个 TCP 连接处于 ESTABLISHED 状态,实际的 MQTT 连接数应该在 1 万个左右,显然这么多的连接肯定存在问题。
由于 MQTT 服务端的内存是按照 2 万个左右连接数规模配置的,因此当连接数达到数十万规模之后,导致了服务端大量 SocketChannel 积压,内存暴涨,高频率的 GC 和较长的 STW 时间对端侧设备的接入造成了很大影响,导致部分设备 MQTT 握手超时,无法接入。
通过抓包分析发现,一些端侧设备并没有按照 MQTT 协议规范进行处理,包括:
客户端发起 CONNECT 连接,SSL 握手成功之后没有按照协议规范继续处理,例如发送 PING 命令。
客户端发起 TCP 连接,不做 SSL 握手,也不做后续处理,导致 TCP 连接被挂起。
由于服务端是严格按照 MQTT 协议规范实现的,上述端侧设备不按规范接入,实际上消息调度不到 MQTT 应用协议层。MQTT 服务端依赖 Keep Alive 机制做超时检测,当一段时间接收不到客户端的心跳和业务消息时,就会触发心跳超时,关闭连接。针对上述两种接入场景,由于 MQTT 的连接流程没有完成,MQTT 协议栈不认为这个是合法的 MQTT 连接,因此心跳保护机制无法对上述 TCP 连接做检测。客户端和服务端都没有主动关闭这个连接,导致 TCP 连接一直保持。
问题原因如下所示:
图 5 MQTT 连接建立过程
针对这种不遵循 MQTT 规范的端侧设备,除了要求对方按照规范修改之外,服务端还需要做可靠性保护,具体策略如下:
端侧设备的 TCP 连接接入之后,启动一个链路检测定时器加入到 Channel 对应的 NioEventLoop 中。
链路检测定时器一旦触发,就主动关闭 TCP 连接。
TCP 连接完成 MQTT 协议层的 CONNECT 之后,删除之前创建的链路检测定时器。
生产环境升级补丁版本之后,平稳运行,查看 MQTT 连接数,稳定在 1 万个左右,与预期一致,问题得到解决。
对于 MQTT 服务端,除了要遵循协议规范之外,还需要对那些不遵循规范的客户端接入做保护,不能因为一些客户端没按照规范实现,导致服务端无法正常工作。系统的可靠性设计更多的是在异常场景下保护系统稳定运行。
针对 Channel 上发生的各种网络操作,例如链路创建、链路关闭、消息读写、链路注册和去注册等,Netty 将这些消息封装成事件,触发 ChannelPipeline 调用 ChannelHandler 链,由系统或者用户实现的 ChannelHandler 对网络事件做处理。
由于网络事件种类比较多,触发和执行机制也存在一些差异,如果掌握不到位,很有可能遇到一些莫名其妙的问题。而且有些问题只有在高并发或者生产环境出现,测试床不容易复现,因此这类问题定位难度很大。
故障场景:业务基于 Netty 开发了 HTTP Server,在生产环境运行一段时间之后,部分消息逻辑处理错误,但是在灰度测试环境验证却无法重现问题,需要尽快定位并解决。
在生产环境中将某一个服务实例的调测日志打开一段时间,以便定位问题。通过接口日志分析,发现同一个 HTTP 请求消息,当发生问题时,业务 ChannelHandler 的 channelReadComplete 方法会被调用多次,但是大部分消息都是调用一次,按照业务的设计初衷,当服务端读取到一个完整的 HTTP 请求消息之后,在 channelReadComplete 方法中进行业务逻辑处理。如果一个请求消息它的 channelReadComplete 方法被调用多次,则业务逻辑就会出现异常。
通过对客户端请求消息和 Netty 框架源码分析,找到了问题根因:TCP 底层并不了解上层业务数据的具体含义,它会根据 TCP 缓冲区的实际情况进行包的拆分,所以在业务上认为一个完整的 HTTP 报文可能会被 TCP 拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送。导致数据报拆分和重组的原因如下:
应用程序 write 写入的字节大小大于套接口发送缓冲区大小。
进行 MSS 大小的 TCP 分段。
以太网帧的 payload 大于 MTU 进行 IP 分片。
开启了 TCP Nagle’s algorithm。
由于底层的 TCP 无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下:
消息定长,例如每个报文的大小为固定长度 200 字节,如果不够,空位补空格。
在包尾增加回车换行符(或者其它分隔符)进行分割,例如 FTP 协议。
将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用 int32 来表示消息的总长度。
对于 HTTP 请求消息,当业务并发量比较大时,无法保证一个完整的 HTTP 消息会被一次全部读取到服务端。当采用 chunked 方式进行编码时,HTTP 报文也是分段发送的,此时服务端读取到的也不是完整的 HTTP 报文。为了解决这个问题,Netty 提供了 HttpObjectAggregator,保证后端业务 ChannelHandler 接收到的是一个完整的 HTTP 报文,相关示例代码如下所示:
*//** 代码省略...*
*ChannelPipeline p = ...;*
*p.addLast("decoder", new HttpRequestDecoder());*
*p.addLast("encoder", new HttpResponseEncoder());*
*p.addLast("aggregator", new HttpObjectAggregator(10240));*
*p.addLast("service", new ServiceChannelHandler());*
*//** 代码省略...*
通过 HttpObjectAggregator 可以保证当 Netty 读取到完整的 HTTP 请求报文之后才会调用一次业务 ChannelHandler 的 channelRead 方法,无论这条报文底层经过了几次 SocketChannel 的 read 调用。但是对于 channelReadComplete 方法,它并不是业务语义上的读取消息完成之后触发,而是每次从 SocketChannel 成功读取到消息之后,系统就会触发对 channelReadComplete 方法的调用,也就是说如果一个 HTTP 消息被 TCP 协议栈发送了 N 次,则服务端的 channelReadComplete 方法就会被调用 N 次。
在灰度测试环境中,由于客户端并没有采用 chunked 的编码方式,并发压力也不是很高,所以一直没有发现该问题,到了生产环境有些客户端采用了 chunked 方式发送 HTTP 请求消息,客户端并发量也比较高,所以触发了服务端 BUG。
ChannelHandler 由 ChannelPipeline 触发,业务经常使用的方法包括 channelRead 方法、channelReadComplete 方法和 exceptionCaught 方法等,它的使用比较简单,但是里面还是有一些容易出错的地方,使用不当就会导致诸如上述案例中的问题。
对于 channelReadComplete 方法的调用,很容易误认为前面已经增加了对应协议的编解码器,所以只有消息解码成功之后才会调用 channelReadComplete 方法。实际上它的调用与用户是否添加协议解码器无关,只要对应的 SocketChannel 成功读取到了 ByteBuf,它就会被触发,相关代码如下所示(NioByteUnsafe 类):
*public final void read() {*
*//** 代码省略...*
*try {*
*do {*
*byteBuf = allocHandle.allocate(allocator);*
*allocHandle.lastBytesRead(doReadBytes(byteBuf));*
*if (allocHandle.lastBytesRead() <= 0) {*
*byteBuf.release();*
*byteBuf = null;*
*close = allocHandle.lastBytesRead() < 0;*
*if (close) {*
*readPending = false;*
*}*
*break;*
*}*
*allocHandle.incMessagesRead(1);*
*readPending = false;*
*pipeline.fireChannelRead(byteBuf);*
*byteBuf = null;*
*} while (allocHandle.continueReading());*
*allocHandle.readComplete();*
*pipeline.fireChannelReadComplete();*
*//** 代码省略...*
*}*
对于大部分的协议解码器,例如 Netty 内置的 ByteToMessageDecoder,它会调用具体的协议解码器对 ByteBuf 做解码,只有解码成功之后,才会调用后续 ChannelHandler 的 channelRead 方法,代码如下所示(ByteToMessageDecoder 类):
*static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {*
*for (int i = 0; i < numElements; i ++) {*
*ctx.fireChannelRead(msgs.getUnsafe(i));*
*}*
*}*
但是对于 channelReadComplete 方法则是透传调用,即无论是否有完整的消息被解码成功,只要读取到消息,都会触发后续 ChannelHandler 的 channelReadComplete 方法调用,代码如下所示(ByteToMessageDecoder 类):
*public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {*
*numReads = 0;*
*discardSomeReadBytes();*
*if (decodeWasNull) {*
*decodeWasNull = false;*
*if (!ctx.channel().config().isAutoRead()) {*
*ctx.read();*
*}*
*}*
*ctx.fireChannelReadComplete();*
*}*
ChannelPipeline 以链表的方式管理某个 Channel 对应的所有 ChannelHandler,需要说明的是下一个 ChannelHandler 的触发需要在当前 ChannelHandler 中显式调用,而不是自动触发式调用,相关代码示例如下(SslHandler 类):
*public void channelActive(final ChannelHandlerContext ctx) throws Exception {*
*if (!startTls) {*
*startHandshakeProcessing();*
*}*
*ctx.fireChannelActive();*
*}*
如果遗忘了调用 ctx.fireChannelActive 方法,则 SslHandler 后续的 ChannelHandler 的 channelActive 方法将不会被执行,职责链执行到 SslHandler 就会中断。
Netty 内置的 TailContext 有时候会执行一些系统性的清理操作,例如当 channelRead 方法执行完成,将请求消息(例如 ByteBuf)释放掉,防止因为业务遗漏释放而导致内存泄漏(内存池模式下会导致内存泄漏),相关代码如下所示(TailContext 类):
*protected void onUnhandledInboundMessage(Object msg) {*
*try {*
*logger.debug(*
*"Discarded inbound message {} that reached at the tail of the pipeline. " +*
*"Please check your pipeline configuration.", msg);*
*} \**finally {***
**ReferenceCountUtil.release(msg);**
**}**
*}*
当执行完业务最后一个 ChannelHandler 时,需要判断是否需要调用系统的 TailContext,如果需要,则通过 ctx.firexxx 方法调用。
通常情况下,在功能测试或者并发压力不大时,HTTP 请求消息可以一次性接收完成,此时 ChannelHandler 的 channelReadComplete 方法会被调用一次,但是当一个整包消息经过多次读取才能完成解码时,channelReadComplete 方法就会被触发调用多次。如果业务的功能正确性依赖 channelReadComplete 方法的调用次数,当客户端并发压力大或者采用 chunked 编码时,功能就会出错。因此,需要熟悉和掌握 Netty 的事件触发机制以及 ChannelHandler 的调用策略,这样才能防止在生成环境踩坑。
作者介绍
李林锋,10 年 Java NIO、平台中间件设计和开发经验,精通 Netty、Mina、分布式服务框架、API Gateway、PaaS 等,《Netty 进阶之路》、《分布式服务框架原理与实践》作者。目前在华为终端应用市场负责业务微服务化、云化、全球化等相关设计和开发工作。联系方式:
新浪微博 Nettying
微信:Nettying
企业组织架构调整,对于技术团队和技术人有什么正面和负面的影响?7 月 12 日深圳 ArchSummit 全球架构师峰会,将邀请业界专家来解答。此外其他专题涵盖微服务、金融架构、数据处理、小程序、产学研结合等话题。邀请阿里、Netflix、百度等公司的技术专家来分享。
点个好看少个 bug 以上是关于Netty 消息接收类故障案例分析的主要内容,如果未能解决你的问题,请参考以下文章