Netty网络编程框架的核心概念以及入门案例
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty网络编程框架的核心概念以及入门案例相关的知识,希望对你有一定的参考价值。
详细介绍了Netty网络编程框架的核心概念以及入门案例。
文章目录
1 Netty的介绍
- 基于事件驱动的Java NIO网络通信框架,可以快速简单地开发网络应用程序。
- 极大地简化并优化了 TCP 和 UDP 套接字服务器等网络编程,并且性能以及安全性等很多方面甚至都要更好。
- 支持多种通信协议 如 FTP,SMTP,HTTP 以及各种二进制和基于文本的传统协议,同样支持自定义协议。
简单的说,Netty有三个优点:
- 高并发:基于 NIO开发(Reactor模型),并发性能相比BIO得到了很大提高。
- 传输快:传输依赖于零拷贝特性,尽量减少不必要的内存拷贝,使用高性能序列化协议protobuf,实现了高效传输。
- 封装好:封装了原始NIO编程的很多细节,提供了易于使用调用接口,使用更简单。
借用官方的描述:Netty 成功地找到了一种在不妥协可维护性和性能的情况下实现易于开发,性能,稳定性和灵活性的方法。
Netty的社区目前非常活跃。很多涉及到网络调用的开源项目和框架底层都用到了Netty,比如我们常用的 Dubbo、RocketMQ、Elasticsearch、gRPC、Spark、GateWay等等。
总之,涉及到网络编程开发时,比如即时通讯系统、自定义RPC框架、自定义HTTP服务器、实时消息推送系统等场景下,用Netty,准没错。
2 Netty的核心组件
2.1 Channel
通道,Netty网络操作抽象类,包括基本的 I/O 操作,如 bind、connect、read、write 等,Netty 的 Channel 接口所提供的 API,大大地降低了直接使用 Socket 类的复杂性。
不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应,下面是一些常用的 Channel 类型:
- NiosocketChannel,异步的客户端 TCP Socket 连接。
- NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
- NioDatagramChannel,异步的 UDP 连接。
- NioSctpChannel,异步的客户端 Sctp 连接。
- NioSctpServerChannel,异步的 Sctp 服务器端连接 这些通道涵盖了 UDP 和 TCP网络 IO以及文件 IO。
2.2 EventLoop
EventLoop(事件循环)接口是Netty的核心接口,用于处理连接的生命周期中所发生的各种事件,实际上就是负责监听网络事件并调用事件处理器进行相关 I/O 操作的处理。
EventLoop内部持有NIO中的Selector,Channel将会注册到EventLoop中,一个EventLoop可以监听多个Channel,EventLoop是实现IO多路复用的核心,可以看作是Reactor模型中的mainReactor。
Channel 为 Netty 网络操作抽象类,EventLoop 负责监听注册到其上的Channel的IO事件,两者配合完成 I/O 操作。
2.3 ChannelFuture
在Netty中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理。
Channel会注册到EventLoop中后会立即返回一个ChannelFuture对象,可以通过ChannelFuture#addListener注册GenericFutureListener监听器,当操作执行成功或失败时监听会自动触发注册的监听事件。
2.4 ChannelHandler
ChannelHandler 是消息的具体处理器。他负责处理各种任务,这个任务非常广泛,可以是读写事件、连接、解码编码、数据转换、业务逻辑等等,处理完毕之后将数据继续转发到ChannelPipeline中的下一个ChannelHandler。
通过定制ChannelHandler可对Netty进行扩展。ChannelHandler接口本身并没有提供很多方法,因为这个接口有许多的方法需要实现,为了方便使用,可以继承它的子类:
- ChannelInboundHandler用于处理入站I/O事件
- ChannelOutboundHandler用于处理出站I/O操作
或者使用以下适配器类,更加方便:
- ChannelInboundHandlerAdapter用于处理入站I/O事件
- ChannelOutboundHandlerAdapter用于处理出站I/O操作
- ChannelDuplexHandler用于处理入站和出站事件
2.5 ChannelPipeline
ChannelPipeline 是一个 ChannelHandler 的链表,即ChannelHandler组成的List,提供了一个沿着链传播入站和出站事件流的 API。
可以在 ChannelPipeline 上通过 addLast() 方法添加一个或者多个ChannelHandler ,因为一个数据或者事件可能会被多个 Handler 处理。当一个 ChannelHandler 处理完之后就将数据交给下一个 ChannelHandler 。
在执行时,入站事件会从链表head往后传递到最后一个入站的handler(ChannelInboundHandler类型),出站事件会从链表tail往前传递到最前一个出站的handler(ChannelOutboundHandler类型),两种类型的handler在执行时互不干扰。如果Handler同时属于入站、出站Handler,则都会执行一次。
在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,当 Channel 被创建时,它会被自动地分配到它专属的 ChannelPipeline。
2.5.1 ChannelHandlerContext
用于传输业务数据,保存Channel相关的所有上下文信息。
将Handler和Pipeline联系起来,实际上ChannelPipeline中直接存储的是ChannelHandlerContext,而每个 ChannelHandlerContext 中又关联着唯一一个 ChannelHandler。
2.5.2 入站和出站
数据入站,一般是指读事件触发,即数据要读进来;数据从底层的Java NIO channel读取到Netty的Channel,此过程中会进行数据解码。
数据出站,一般是指写事件触发,即数据要写出去;数据从Netty的Channel写入底层的 Java NIO chanel,此过程中会进行数据编码。
入站会从先读取数据,再执行入站的Handler;出站会先执行出站的Handler,再写入。
即每次出现读事件时,会执行入站操作,实际读取数据之后,会先从头至尾依次调用ChannelPipeline 中的InboundHandler处理,不会调用OutboundHandler;而触发写事件时,会执行出站操作,实际写入数据之前,则会从尾到头依次调用ChannelPipeline的OutboundHandler处理,不会调用InboundHandler;
下图描述了 ChannelPipeline 中的 ChannelHandlers 通常如何处理 I/O 事件(https://netty.io/4.1/api/io/netty/channel/ChannelPipeline.html):
入站事件由入站处理程序按自下而上的方向处理,如图左侧所示。入站处理程序通常处理由图底部的 I/O 线程生成的原始入站数据,例如通过 SocketChannel.read(ByteBuffer)读取。
出站事件由出站处理程序按自上而下的方向处理,如图右侧所示。出站处理程序通常会生成或转换出站流量,例如写入请求。如果出站事件超出了底部出站处理程序,则由与通道关联的 I/O 线程处理。I/O 线程执行实际的输出操作,例如通过 SocketChannel.write(ByteBuffer)输出。
2.6 EventLoopGroup
EventLoopGroup相当于1个事件循环组,这个组里包含多个事件循环EventLoop, EventLoop 的主要作用实际就是负责监听网络事件并调用事件处理器进行相关 I/O 操作的处理。
EventLoopGroup内部的每个EventLoop通常包含1个Selector和1个事件循环线程,一个EventLoop可以绑定多个Channel,但一个Channel只能绑定一个EventLoop,这样某一个连接的IO事件就在专有的线程上处理,保证线程安全。
Netty Server端包含1个Boss NioEventLoopGroup和1个Worker NioEventLoopGroup:
- Boss NioEventLoop主要循环执行的工作:
- select监听accept事件。
- 处理到来的accept事件,与Client建立连接,生成SocketChannel,并将SocketChannel注册到某个Worker NioEventLoop的Selector上。
- 处理任务队列中的任务,runAllTasks。任务队列中的任务包括用户调用eventloop.execute或schedule执行的任务,或者其它线程提交到该eventloop的任务。
- Worker NioEventLoop主要循环执行的工作:
- select监听read、write事件。
- 处理到来的read、write事件,在NioSocketChannel可读、可写事件发生时进行处理。
- 处理任务队列中的任务,runAllTasks。
3 Netty的线程模型
Netty通过Reactor模型基于多路复用器接收并处理用户请求,内部实现了两个线程池,boss线程池和work线程池,其中boss线程池的线程负责处理请求的accept事件,当接收到accept事件的请求时,就会建立连接并把对应的socket封装到一个NioSocketChannel中,并交给work线程池,其中work线程池负责请求的read和write事件,以及业务逻辑,这些都由对应的Handler处理。
Netty 主要靠 NioEventLoopGroup 线程池的配置来实现具体的线程模型。
3.1 单线程模型
bossGroup和workerGroup使用同一个NioEventLoopGroup,且配置线程数为1。
适合连接量和并发量都不大的应用。
3.2 多线程模型
bossGroup和workerGroup使用不同NioEventLoopGroup,且bossGroup配置线程数为1。
适合连接量不大,并发量大的应用。
3.3 主从多线程模型
bossGroup和workerGroup使用不同NioEventLoopGroup,且都配置为多线程。
适合连接量和并发量都比较大的应用。
从一个主线程 NIO 线程池中选择一个线程作为 Acceptor 线程,绑定监听端口,接收客户端连接的连接,其他线程负责后续的接入认证等工作。连接建立完成后分派给workerGroup线程。
4 Netty默认启动的线程数
EventLoopGroup 默认的构造函数实际会起的线程数为 CPU核心数*2,但bossGroup一般设置数量为1。EventLoopGroup内部的EventLoop数量就是线程数量,保证1对1的关系。
5 Netty的启动过程
5.1 服务端
首先初始化两个NioEventLoopGroup,其中boosGroup用于处理客户端建立TCP连接的请求(Accept事件), workerGroup用于处理每一条连接的I/O读写事件和具体的业务逻辑。
NioEventLoopGroup 类的无参构造函数的默认设置的线程数量是 CPU 核心数 *2 。一般情况下我们会指定 bossGroup 的 线程数为 1(并发连接量不大的时候) ,workGroup 的线程数量为 CPU 核心数 *2 。
随后创建一个ServerBootstrap,它是服务端的启动引导类/辅助类,它将引导我们进行服务端的启动工作。通过ServerBootstrap配置EventLoopGroup、Channel类型,连接参数、配置入站、出站事件handler等。
最后通过bind()方法绑定端口,开始工作。
public class NettyServer {
static int port = 8888;
public static void main(String[] args) {
//1 bossGroup 用于接收连接 mainReactor
//workerGroup 用于具体的处理 subReactor
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//2.创建服务端启动引导/辅助类:ServerBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap();
//3.给引导类配置两大线程组,确定了线程模型
serverBootstrap
.group(bossGroup, workerGroup)
// 4.指定 IO 模型
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
//5.可以自定义客户端消息的业务处理逻辑Handler
p.addLast(new HelloServerHandler());
p.addLast(…………);
}
});
// 6.绑定端口,调用 sync 方法阻塞直到绑定完成
ChannelFuture f = serverBootstrap.bind(1234).sync();
// 7.阻塞等待直到服务器Channel关闭(closeFuture()方法获取Channel 的CloseFuture对象,然后调用sync()方法)
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//8.优雅关闭相关线程组资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
5.2 客户端
首先初始化一个NioEventLoopGroup。
随后创建一个Bootstrap,它是客户端的启动引导类/辅助类,它将引导我们进行客户端的启动工作。通过Bootstrap配置EventLoopGroup、Channel类型,连接参数、配置入站、出站事件handler等。
最后通过connect()方法使用服务端的ip和port进行连接,开始工作。
public class NettyClient {
static int port = 8888;
static String host = "127.0.0.1";
public static void main(String[] args) {
//1.创建一个 NioEventLoopGroup 对象实例
EventLoopGroup group = new NioEventLoopGroup();
try {
//2.创建客户端启动引导/辅助类:Bootstrap
Bootstrap bootstrap = new Bootstrap();
//3.指定线程组
bootstrap.group(group)
//4.指定 IO 模型
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// 5.这里可以自定义消息的业务处理逻辑
p.addLast(new HelloClientHandler(message));
p.addLast(…………);
}
});
// 6.尝试建立连接
ChannelFuture f = bootstrap.connect(host, port).sync();
// 7.等待连接关闭(阻塞,直到Channel关闭)
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
6 TCP 粘包/拆包的原因以及解决办法
6.1 原因
TCP是以流的方式来处理数据,底层会有一个缓冲区,一个完整的较大的包可能会被TCP拆分成多个包进行发送,也可能把多个小的包封装成一个大的数据包发送。
TCP粘包/拆包的原因:应用程序写入的字节大小大于套接字发送缓冲区的大小,会发生拆包现象,实际表现就是不能收到完整的消息。而应用程序写入数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上,这将会发生粘包现象,实际表现就是一次性收到多条粘连在一起消息。
报头的选项字段有MSS(Maximum Segment Size,最大报文段大小)字段,规定一个TCP包最大可传输的字节数,一般是1500-20-20=1460字节,大于该大小时将发生拆包。
6.2 解决办法
- 使用 Netty 自带的解码器
- LineBasedFrameDecoder : 发送端发送数据包的时候,每个数据包之间以换行符作为分隔,即\\n或者\\r\\n,其工作原理是它依次遍历 ByteBuf 中的可读字节,判断是否有换行符,然后进行相应的截取。
- DelimiterBasedFrameDecoder : 可以自定义分隔符解码器,其实际上是一种特殊的DelimiterBasedFrameDecoder 解码器。
- FixedLengthFrameDecoder: 固定长度解码器,它能够按照指定的长度对消息进行相应的拆包。需要约定每一个包的固定大小。
- LengthFieldBasedFrameDecoder:将消息分为消息头和消息体。在头部中保存有当前整个消息的长度,只有在读取到足够长度的消息之后才算是读到了一个完整的消息。
- 通过自定义协议进行粘包和拆包的处理。
7 Netty的长连接、心跳机制
Netty客户端和服务器采用长连接保持联系。client 与 server 完成一次读写,它们之间的连接并不会主动关闭,后续的读写操作会继续使用这个连接。长连接的可以省去较多的 TCP 建立和关闭的操作,降低对网络资源的依赖,节约时间。
在 TCP 保持长连接的过程中,可能会出现断网等网络异常出现,异常发生的时候, client 与 server 之间如果没有交互的话,它们是无法发现对方已经掉线的。为了解决这个问题, 我们就需要引入心跳机制 。
心跳机制的工作原理是: 在 client 与 server 之间在一定时间内没有数据交互时, 即处于 idle 状态时,客户端或服务器就会发送一个特殊的数据包给对方,当接收方收到这个数据报文后,也立即发送一个特殊的数据报文,回应发送方,此即一个 PING-PONG 交互。所以,当某一端收到心跳消息后,就知道了对方仍然在线,这就确保 TCP 连接的有效性。
TCP 实际上自带的就有长连接选项,本身是也有心跳包机制,也就是 TCP 的选项:SO_KEEPALIVE。但是,TCP 协议层面的长连接灵活性不够。所以,一般情况下我们都是在应用层协议上实现自定义心跳机制的,也就是在 Netty 层面通过编码实现。通过 Netty 实现心跳机制的话,核心类是 IdleStateHandler 。
Netty支持的哪些心跳类型设置:
- readerIdleTime:为读超时时间(即测试端一定时间内未接受到被测试端消息)。
- writerIdleTime:为写超时时间(即测试端一定时间内向被测试端发送消息)。
- allIdleTime:所有类型的超时时间。
8 Netty 的零拷贝
零复制(英语:Zero-copy;也译零拷贝)技术是指计算机执行操作时,CPU 不需要先将数据从某处内存复制到另一个特定区域。这种技术通常用于通过网络传输文件时节省 CPU 周期和内存带宽。
Netty 中的零拷贝体现在以下几个方面:
- Netty 提供了CompositeByteBuf类,可以将多个ByteBuf合并为一个逻辑上的 ByteBuf,避免了各个 ByteBuf 之间的数据拷贝。
- ByteBuf 支持 slice 操作, 因此可以将 ByteBuf 分解为多个共享同一个存储区域的 ByteBuf, 避免了内存的拷贝。
- 通过 FileRegion 包装的FileChannel.tranferTo 实现文件传输,可以直接将文件缓冲区的数据发送到目标 Channel,避免了传统通过循环 write 方式导致的内存拷贝问题。
- Netty 的接收和发送 ByteBuffer 采用 DIRECT BUFFERS,使用堆外直接内存进行 Socket 读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行 Socket 读写,JVM 会将堆内存 Buffer 拷贝一份到直接内存中,然后才写入 Socket 中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
9 Netty 和 Tomcat 的区别
作用不同:Tomcat 是 Servlet 容器,可以视为 Web 服务器,是一款已经开发好的软件,而 Netty 是一款强大的异步事件驱动的网络应用程序框架,用于简化网络编程,可用于编写各种服务器。
协议不同:Tomcat 是基于 http 协议的 Web 服务器,而 Netty 支持各种现成的协议并且能通过编程自定义各种协议,因为 Netty 本身自己能编码/解码字节流,所以Netty 可以实现HTTP 服务器、FTP 服务器、UDP 服务器、RPC 服务器、WebSocket 服务器、Redis 的 Proxy 服务器、mysql 的 Proxy 服务器等等。
10 Netty简单案例
client:
public class NettyClient {
public static void main(String[] args) throws IOException, InterruptedException {
//1.创建一个 NioEventLoopGroup 对象实例
EventLoopGroup group = new NioEventLoopGroup();
try {
//2.创建客户端启动引导/辅助类:Bootstrap
Bootstrap bootstrap = new Bootstrap();
//3.指定线程组
bootstrap.group(group)
//4.指定 IO 模型
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 5.这里可以自定义消息的业务处理逻辑
pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new ClientHandler());
}
});
// 6.尝试建立连接
ChannelFuture f = bootstrap.connect("localhost", 8888).sync();
Channel channel = f.channel();
// 7.等待连接关闭(阻塞,直到Channel关闭)
//channel.closeFuture().sync();
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for (; ; ) {
String s = br.readLine();
channel.writeAndFlush(s + "\\r\\n");
if ("bye".equals(s)) {
break;
}
}
} finally {
group.shutdownGracefully();
}
}
}
ClientHandler:
/**
* @author lx
*/
public class ClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);
}
}
NettyServer:
public class NettyServer {
public static void main(String[] args) {
//1 bossGroup 用于接收连接 mainReactor
//workerGroup 用于具体的处理 subReactor
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//2.创建服务端启动引导/辅助类:ServerBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap();
//3.给引导类配置两大线程组,确定了线程模型
serverBootstrap
.group(bossGroup, workerGroup)
// 4.指定 IO 模型
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
//5.可以自定义客户端消息的业务处理逻辑Handler
pipeline.addLast(new DelimiterBasedFrameDecoder(4096,Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new ServerHandler());
}
});
// 6.绑定端口,调用 sync 方法阻塞直到绑定完成
ChannelFuture f = serverBootstrap.bind(8888).sync();
// 7.阻塞等待直到服务器Channel关闭(closeFuture()方法获取Channel 的CloseFuture对象,然后调用sync()方法)
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//8.优雅关闭相关线程组资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
ServerHandler:
/**
* @author lx
*/
public class ServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 读取请求
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
Channel channel = ctx.channel();
System.out.println("client: " + channel.remoteAddress());
System.out.println("from client: " + msg);
double v = ThreadLocalRandom.current().nextDouble();
channel.writeAndFlush("from server: " + v + " \\r\\n");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
System.out.println("client: " + channel.remoteAddress() + "加入");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
System.out.println("client: " + channel.remoteAddress() + "离开");
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
System.out.println("client: " 以上是关于Netty网络编程框架的核心概念以及入门案例的主要内容,如果未能解决你的问题,请参考以下文章