彻底搞懂 Netty 线程模型

Posted 一角钱技术

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了彻底搞懂 Netty 线程模型相关的知识,希望对你有一定的参考价值。

前言

在学习Netty 之前我们最好先掌握 BIO、NIO、AIO 基础知识,前面我们已经花了三篇文章去讲这些知识。我们开始来学习 Netty 的具体知识了,本文就Netty线程模型展开分析。

基本概念

IO 模型

  • BIO:同步阻塞模型;
  • NIO:基于IO多路复用技术的“非阻塞同步”IO模型。简单来说,内核将可读可写事件通知应用,由应用主动发起读写事件;
  • AIO:非阻塞异步IO模型。简单来说,内核将读完成事件通知应用,读操作由内核完成,应用只需要操作数据即可;应用做异步写操作时立即返回,内核会进行写操作排队并执行写操作。

NIO 和 AIO 不同之处在于应用是否进行真正的读写操作。

reactor 和 proactor 模型

  • reactor:基于NIO技术,可读可写时通知应用;
  • proactor:基于AIO技术,读完成时通知应用,写操作应用通知内核。

Netty认识

Netty是Java领域有名的开源网络库,特点是高性能和高扩展性,因此很多流行的框架都是基于它来构建的,比如我们熟知的Dubbo、Rocketmq、Hadoop等。

通过前面 NIO 的学习可以看到,NIO 的类库和API 繁杂,例如 SelectorServerSocketChannelSocketChannelByteBuffer等这些对于从事应用层的程序员来说,使用起来开发工作量和难度都非常大。另外客户端面临断连重连、 网络闪断、心跳处理、半包读写、 网络拥塞 和异常流的处理等等。

Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装,解决了上述问题。且Netty拥有高性能、 吞吐量更高,延迟更低,减少资源消耗,最小化不必要的内存复制等优点。

Netty 现在都在用的是4.x,5.x版本已经废弃,Netty 4.x 需要JDK 6以上版本支持。

在了解Netty使用场景后,本节将从IO模型的演进角度来分析Netty线程模型,通过并发编程之父Doug Lea所写《Scalable IO in Java》中涉及的一些IO处理模式,一步一步深入理解Netty线程模型的“进化历史”。

《Scalable IO in Java》:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

Netty使用场景

  1. 互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 使用 Dubbo 协议进行通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信;消息中间件Rocketmq底层也是用的Netty作为基础通信组件。
  2. 游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言都得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。
  3. 大数据领域:经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨节点通信,它的 Netty Service 是基于 Netty 框架二次封装实现。

Netty相关开源项目:https://netty.io/wiki/related-projects.html

IO处理模式演进

基本上所有的网络处理程序都遵循以下基本的处理(handler)流程:

  1. Read request (接收二进制数据)
  2. Decode request (解码为可读数据)
  3. Process service (对数据进行处理产生结果)
  4. Encode reply (将结果编码为二进制数据)
  5. Send reply (返回结果)

传统网络服务器会为每一个连接的处理开启一个新的线程,即我们前面所说的BIO模型(多线程模式),我们可以看下大致的示意图:彻底搞懂 Netty 线程模型

上图为 BIO版本

BIO模型对于每一个请求都分发给一个线程(可以理解为一个handler),每个handler中都独自处理上面1-5流程。这种模型的适用场景和瓶颈可以查看。

改进:采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理(非阻塞)。这就是对应的NIO线程模型。彻底搞懂 Netty 线程模型

上图为单线程版事件驱动模型,可以理解为NIO单线程版本

上面用到了 Reactor 模式。关于 Reactor 模式的两个概念:

  • Reactor:负责响应 IO 事件,当检测到一个新的事件,将其发送给相应的 Handler 去处理。
  • Handler:负责处理非阻塞的行为,标识系统管理的资源,同时将 Handler 与事件绑定。

注意:Reactor 为单个线程,如上图所示:不仅需要处理客户端的 accept 连接请求,同时也要负责分发(dispatch)读写请求到处理器中。由于只有单个线程处理各种请求,所以要求处理器中的业务需要能够快速处理完。

改进:现在的服务器基本上是多核 CPU,那么在多处理器场景下,为实现服务的高性能我们可以有目的的采用多线程模式处理业务。彻底搞懂 Netty 线程模型

上图为多线程版本事件驱动模型,可以理解为NIO多线程版本

通过与NIO单线程模型相比,增加了worker线程池,专门用于处理非IO操作(decode、compute、encode),大大提高了工作效率。

这种模型下,客户端发送过来的连接和注册还是由主线程 Reactor 统一去处理,只不过客户端连接成功后的后续事件分发给 worker 线程池去处理。

但是,当客户端短时间内几十万或者上百万条连接请求的时候(双十一、春运抢票),单个 Rector 不仅要处理注册事件,也要同时分发任务到 worker 线程池,由于分发也是比较耗时的操作,有可能会导致阻塞。

继续改进:将Reactor 拆分为两部分彻底搞懂 Netty 线程模型

上图为主从NIO模型

如上图所示,在这种模型下,mainReactor 专门负责新客户端的连接操作,建立通道(channel),然后将一定事件内的 channel 注册到另外一个 subReactor,subReactor 负责将客户端的读写请求交给线程池处理。这样即使几十万个请求同时到来也无所谓了。

通俗理解,mainReactor就是大总管,只负责接口,subReactor就是一个员工,负责给总管接待客户提供服务。

Netty线程模型

通过对 《 Scalable IO in Java 》里的一些 IO 处理模式理解, Netty的线程模型就是由上面主从NIO模型演变来的,是基于Reactor模型的。

如下图所示,Boos Group 就是上面提到的 mainReactor,与上面不同在于 Worker Group,它可以理解为一组 subReactor,即在大总管下面有多个员工来干活,每次接收的客户都均匀分配给不同员工。Netty 之所以单机支持百万级别并发量,就是因为一主多从的线程模型。彻底搞懂 Netty 线程模型

需要说明的是,Netty 的线程模型并不是一成不变的。它通常采用一主多从,但是也可以根据实际需要配置启动参数,通过设置不同的启动参数,Netty 可以同时支持 “多主多从”。

下面是对上图“一主多从” Netty 模型的详细解释:

  1. Netty 抽象除两组线程池 BossGroup 和 WorkerGroup,BossGroup 专门负责接收客户端的连接,WorkerGroup 专门负责网络的读写。
  2. BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup。
  3. NioEventLoopGroup 相当于一个事件循环线程组,这个组中含有多个事件循环线程,每一个事件循环线程是 NioEventLoop。
  4. 每个 NioEventLoop 都有一个 selector,用于监听注册在其上的 socketChannel 的网络通讯。
  5. 每个 Boss NioEventLoop 线程内部循环执行的步骤有 3 步:
    1. 处理accept事件,与 client 建立连接,生成 NiosocketChannel;
    2. 将NioSocketChannel 注册到某个 worker NioEventLoop 上的 selector;
    3. 处理任务队列的任务,即runAllTasks。
  6. 每个 worker NIOEvent'Loop线程循环执行的步骤:
    1. 轮询注册到最近的 selector 上所有的 NioSocketChannel 的 read、write 事件;
    2. 处理 I/O 事件,即 read、write事件,在对应的NioScoketChannel 处理业务;
    3. runAllTask 处理任务队列 TaskQueue 的任务,一些耗时的业务处理一般可以放入 TaskQueue 中,这样不影响数据在 pipeline 中的流动处理。
  7. 每个 worker NIOEventLoop 处理 NioSocketChannel 业务时,会使用 pipeline (管道),管道中维护来很多 handler 处理器用来处理 channel 中的数据。

Netty 模块组件

Bootstrap、ServerBootstrap

Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty程序,通过链式调用串联各个组件。Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端 启动引导类。

Future、ChannelFuture

正如前面介绍,在 Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。

Channel

Netty 网络通信的组件,能够用于执行网络 I/O 操作。Channel 为用户提供:

  1. 当前网络连接的通道的状态(例如是否打开?是否已连接?)
  2. 网络连接的配置参数 (例如接收缓冲区大小)
  3. 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成。
  4. 调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方。
  5. 支持关联 I/O 操作与对应的处理程序。不同协议、不同的阻塞类型的连接都有不同的Channel 类型与之对应。下面是一些常用的 Channel 类型:
NioSocketChannel,异步的客户端 TCP Socket 连接。(最常用)
NioServerSocketChannel,异步的服务器端 TCP Socket 连接
NioDatagramChannel,异步的 UDP 连接
NioSctpChannel,异步的客户端 Sctp 连接
NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。

Selector

Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel 。

NioEventLoop

NioEventLoop 中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务:

  1. I/O 任务即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发。
  2. 非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发。

NioEventLoopGroup

NioEventLoopGroup,主要管理 eventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个线程。

ChannelHandler

ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:

ChannelInboundHandler 用于处理入站 I/O 事件
ChannelOutboundHandler 用于处理出站 I/O 操作

或者使用以下适配器类:

ChannelInboundHandlerAdapter 用于处理入站 I/O 事件。
ChannelOutboundHandlerAdapter 用于处理出站 I/O 操作。

ChannelHandlerContext

保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。

ChannelPipline

保存 ChannelHandler 的 List,用于处理或拦截 Channel 的入站事件和出站操作。ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互。在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下:彻底搞懂 Netty 线程模型一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。read事件(入站事件)和write事件(出站事件)在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰。

Netty通讯示例

Netty的maven依赖

<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.52.Final</version>
</dependency>
</dependencies>

服务端代码

package com.niuh.netty.base;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {

public static void main(String[] args) throws Exception {
//创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来配置参数
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为服务器的通道实现
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>()
{//创建通道初始化对象,设置初始化参数

@Override
protected void initChannel(SocketChannel ch) throws Exception {
//对workerGroup的SocketChannel设置处理器
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start。。");
//绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
//启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
ChannelFuture cf = bootstrap.bind(9000).sync();
//给cf注册监听器,监听我们关心的事件
/*cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口9000成功");
} else {
System.out.println("监听端口9000失败");
}
}
});*/

//对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

服务端所注册的自定义回调函数 NettyServerHandler:

package com.niuh.netty.base;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
* 自定义Handler需要继承netty规定好的某个HandlerAdapter(规范)
*/

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

/**
* 读取客户端发送的数据
*
* @param ctx 上下文对象, 含有通道channel,管道pipeline
* @param msg 就是客户端发送的数据
* @throws Exception
*/

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程 " + Thread.currentThread().getName());
//Channel channel = ctx.channel();
//ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
//将 msg 转成一个 ByteBuf,类似NIO 的 ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
}

/**
* 数据读取完毕处理方法
*
* @param ctx
* @throws Exception
*/

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}

/**
* 处理异常, 一般是需要关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

这个类继承了ChannelInboundHandlerAdapter的几个方法:

  • channelRead方法:当客户端与服务端连通好之后,客户端发数据时,服务端会主动调用这个方法。
  • channelReadComplete方法:数据处理完毕的方法,ctx.writeAndFlush()就可以往客户端写回数据了。

客户端代码

package com.niuh.netty.base;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {
public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是ServerBootstrap而是Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现
.handler(new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入处理器
ch.pipeline().addLast(new NettyClientHandler());
}
});

System.out.println("netty client start。。");
//启动客户端去连接服务器端
ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
//对通道关闭进行监听
cf.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}

客户端自定义回调函数:

package com.niuh.netty.base;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

/**
* 当客户端连接服务器完成就会触发该方法
*
* @param ctx
* @throws Exception
*/

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}

//当通道有读取事件时会触发,即服务端发送数据给客户端
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务端的地址:" + ctx.channel().remoteAddress());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

与NettyServerHandler类似,其中的channelActi() 方法是当客户端与服务器连接完成时候就会执行的方法。

看完代码,我们发现Netty架的目标就是让你的业务逻辑从网络基础应用编码中分离出来,让你可以专 注业务的开发,而不需写一大堆类似NIO的网络处理操作。

ByteBuf 理解

从结构上来说,ByteBuf 由一串字节数组构成。数组中每个字节用来存放信息。

ByteBuf 提供了两个索引,一个用于读取数据,一个用于写入数据。这两个索引通过在字节数组中移动,来定位需要读或者写信息的位置。

  • 当从 ByteBuf 读取时,它的 readerIndex(读索引)将会根据读取的字节数递增。
  • 同样,当写 ByteBuf 时,它的 writerIndex 也会根据写入的字节数进行递增。
彻底搞懂 Netty 线程模型
ByteBuf.png

需要注意的是极限的情况是 readerIndex 刚好读到了 writerIndex 写入的地方。如果 readerIndex 超过了 writerIndex 的时候,Netty 会抛出 IndexOutOf-BoundsException 异常。

示例代码

package com.niuh.netty.base;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;

public class NettyByteBuf {
public static void main(String[] args) {
// 创建byteBuf对象,该对象内部包含一个字节数组byte[10]
// 通过readerindex和writerIndex和capacity,将buffer分成三个区域
// 已经读取的区域:[0,readerindex)
// 可读取的区域:[readerindex,writerIndex)
// 可写的区域: [writerIndex,capacity)
ByteBuf byteBuf = Unpooled.buffer(10);
System.out.println("byteBuf=" + byteBuf);

for (int i = 0; i < 8; i++) {
byteBuf.writeByte(i);
}
System.out.println("byteBuf=" + byteBuf);

for (int i = 0; i < 5; i++) {
System.out.println(byteBuf.getByte(i));
}
System.out.println("byteBuf=" + byteBuf);

for (int i = 0; i < 5; i++) {
System.out.println(byteBuf.readByte());
}
System.out.println("byteBuf=" + byteBuf);


//用Unpooled工具类创建ByteBuf
ByteBuf byteBuf2 = Unpooled.copiedBuffer("hello,zhangsan!", CharsetUtil.UTF_8);
//使用相关的方法
if (byteBuf2.hasArray()) {
byte[] content = byteBuf2.array();
//将 content 转成字符串
System.out.println(new String(content, CharsetUtil.UTF_8));
System.out.println("byteBuf=" + byteBuf2);

System.out.println(byteBuf2.readerIndex()); // 0
System.out.println(byteBuf2.writerIndex()); // 12
System.out.println(byteBuf2.capacity()); // 36

System.out.println(byteBuf2.getByte(0)); // 获取数组0这个位置的字符h的ascii码,h=104

int len = byteBuf2.readableBytes(); //可读的字节数 12
System.out.println("len=" + len);

//使用for取出各个字节
for (int i = 0; i < len; i++) {
System.out.println((char) byteBuf2.getByte(i));
}

//范围读取
System.out.println(byteBuf2.getCharSequence(0, 6, CharsetUtil.UTF_8));
System.out.println(byteBuf2.getCharSequence(6, 6, CharsetUtil.UTF_8));
}
}
}

PS:以上代码提交在 Github :https://github.com/Niuh-Study/niuh-netty.git

文章持续更新,可以微信搜一搜「 一角钱技术 」第一时间阅读, 本文 GitHub org_hejianhui/JavaStudy 已经收录,欢迎 Star。

部分图片来源于网络,版权归原作者,侵删。

以上是关于彻底搞懂 Netty 线程模型的主要内容,如果未能解决你的问题,请参考以下文章

昨天,我彻底搞懂了Netty内存分配策略!

Java中的线程池如何实现,一文彻底搞懂

Java并发系列终结篇:彻底搞懂Java线程池的工作原理

一文让你彻底搞懂多线程

Netty基础系列 --彻底理解NIO

彻底搞懂标准盒模型和怪异盒模型