Netty原理和基础

Posted 程序员星宇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty原理和基础相关的知识,希望对你有一定的参考价值。

《高并发实战》之Netty基础和原理

前言


    这周末去参加了个音乐节玩的比较嗨,文章也没做整理,加上最近阿里云主机被攻击,搞得很烦,这几天在处理云主机的问题,后续也会把遇到的木马、挖矿程序整理出来。这篇接着上一篇,对netty基础和原理做个收尾。

Netty原理和基础(二) 交易担保 微信读书 星宇送你无限卡,百万好书免费读

五、详解Channel通道

通过上面的学习,通道是Netty网络中最重要的核心,代表着网络连接。通道是通信的主题,负责同对端进行网络通信,可以写入数据到对端,也可以从对端读取数据。所有有必要堆通道做个整理。

5.1 构造函数

 protected AbstractChannel(Channel parent) { this.parent = parent;//父通道 this.unsafe = this.newUnsafe();//底层的NIO通道,完成实际的NIO操作 this.pipeline = new DefaultChannelPipeline(this);//一个通道一个流水线 }

从构造函数中我们可以明显看出AbstractChannel内部有一个pipeline属性,表示处理器的流水线。初始化通道时,pipeline属性会初始化成DefaultChannelPipeline实例。

其中parent属性表示通道的父通道。对于连接监听通道来说(如NioserverSocketChannel实例),他的父通道就是null,而对于每一条传输通道(如NioSocketChannel实例)他的父通道就是接收到该连接的服务连接监听通道。

几乎所有的通道实现类都继承了AbstractChannel抽象类,都拥有上面的parent和pipeline两个属性成员。

5.2 成员方法

看完构造函数,继续 了解他的成员方法。


  1. ChannelFuture connect(SocketAddress address)



  2. ChannelFuture bind(SocketAddress address)



  3. ChannelFuture close()

    关闭通道连接,返回连接关闭的ChannelFuture异步任务。如果需要在连接正式关闭后执行其他操作,则需要为异步任务设置回调方法;或者调用ChannelFuture异步任务的sync( ) 方法来阻塞当前线程,一直等到通道关闭的异步任务执行完毕。



  4. Channel read()

    读取通道数据,并且启动入站处理。具体来说,从内部的JavaNIO Channel通道读取数据,然后启动内部的Pipeline流水线,开启数据读取的入站处理。此方法的返回通道自身用于链式调用。



  5. ChannelFuture write(Object o)

    启程出站流水处理,把处理后的最终数据写到底层Java NIO通道。此方法的返回值为出站处理的异步处理任务。



  6. Channel flush()

    缓冲区中的数据立即写出到对端。并不是每一次write操作都是将数据直接写出到对端,write操作的作用在大部分情况下仅仅是写入到操作系统的缓冲区,操作系统会将根据缓冲区的情况,决定什么时候把数据写到对端。而执行flush()方法立即将缓冲区的数据写到对端。

5.3 嵌入式通道EmbeddedChannel

这是一个模拟入站和出站的通道,主要用于测试。

Netty原理和基础(二)

六、详解Handler业务处理器

Reactor反应器获取到IO事件后,分发到Handler业务处理器,由handler完成IO操作和业务处理。

整个IO处理的流程包括:通道读数据、数据包解码、业务处理、目标数据编码、数据包写入通道。

黑体字的四个流程是Netty底层负责完成,我们要做的就是业务处理。

前面已经介绍过,从应用程序开发人员的角度来看,有入站和出站两种类型操作。

· 入站处理,触发的方向为:自底向上,Netty的内部(如通道)到ChannelInboundHandler入站处理器。

· 出站处理,触发的方向为:自顶向下,从ChannelOutboundHandler出站处理器到Netty的内部(如通道)。按照这种方向来分,前面数据包解码、业务处理两个环节——属于入站处理器的工作;后面目标数据编码、把数据包写到通道中两个环节——属于出站处理器的工作。

6.1 入站处理器

到数据进入Netty通道后,netty就会触发入站处理器,这里我们就学习一下ChannelInboundHandler。


  1. channelRegistered

    当通道注册完后,Netty会调用fireChannelRegistered,触发通道注册事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelRegistered方法,会被调用到。



  2. channelActive

    当通道激活完成后,Netty会调用fireChannelActive,触发通道激活事件



  3. channelRead

    当通道缓冲区可读,Netty会调用fireChannelRead,触发通道可读事件



  4. channelReadComplete

    当通道缓冲区读完,Netty会调用fireChannelReadComplete,触发通道读完事件



  5. channelInactive

    当连接被断开或者不可用,Netty会调用fireChannelInactive,触发连接不可用事件



  6. exceptionCaught

    通道处理过程发生异常时,Netty会调用fireExceptionCaught,触发异常捕获事件


6.2 出站处理器

当业务处理完成后,需要操作Java NIO底层通道时,通过一系列的ChannelOutboundHandler通道出站处理器,完成Netty通道到底层通道的操作。比方说建立底层连接、断开底层连接、写入底层Java NIO通道等。ChannelOutboundHandler接口定义了大部分的出站操作


  1. bind



  2. connect

    连接服务端:完成底层Java IO通道的服务器端的连接操作。如果使用TCP传输协议,这个方法用于客户端。



  3. write

    写数据到底层:完成Netty通道向底层Java IO通道的数据写入操作。此方法仅仅是触发一下操作而已,并不是完成实际的数据写入操作。



  4. flush

    腾空缓冲区中的数据,把这些数据写到对端:将底层缓存区的数据腾空,立即写出到对端。



  5. read

    从底层读数据:完成Netty通道从Java IO通道的数据读取。



  6. disConnect

    断开服务器连接



  7. close

    主动关闭通道:关闭底层的通道,例如服务器端的新连接监听通道


6.3 通道初始化处理器

通道和Handler业务处理器的关系是:一条Netty的通道拥有一条Handler业务处理器流水线,负责装配自己的Handler业务处理器。装配Handler的工作,发生在通道开始工作之前。

但是如果向流水线中装配业务处理器呢?这就得借助通道的初始化类——ChannelInitializer。

回顾一下代码

 //5 装配子通道流水线 b.childHandler(new ChannelInitializer<SocketChannel>() { //有连接到达时会创建一个channel protected void initChannel(SocketChannel ch) throws Exception{ // pipeline管理子通道channel中的Handler // 向子channel流水线添加一个handler处理器 ch.pipeline().addLast(new NettyDiscardHandler()); } });

这里面使用了ChannelInitializer。

initChannel()方法是ChannelInitializer定义的一个抽象方法,在父通道调用initChannel()方法时,会将新接收的通道作为参数,传递给initChannel()方法。initChannel()方法内部大致的业务代码是:拿到新连接通道作为实际参数,往它的流水线中装配Handler业务处理器。

其他的方法就不介绍了类似于前面的两种处理器,感兴趣的可以自己去我的git上看。

Netty原理和基础(二)

七、详解Pipeline流水线

前面讲到,一条Netty通道需要很多的Handler业务处理器来处理业务。每条通道内部都有一条流水线(Pipeline)将Handler装配起来。Netty的业务处理器流水线ChannelPipeline是基于责任链设计模式(Chain of Responsibility)来设计的,内部是一个双向链表结构,能够支持动态地添加和删除Handler业务处理器。

关于他的流程感兴趣的可以去我的git上看看,这里就不做详细介绍了。

7.1 ChannelHandlerContext上下文

不管我们定义的是哪种类型的Handler业务处理器,最终它们都是以双向链表的方式保存在流水线中。

在Handler业务处理器被添加到流水线中时,会创建一个通道处理器上下文ChannelHandlerContext,它代表了ChannelHandler通道处理器和ChannelPipeline通道流水线之间的关联。

Channel、Handler、ChannelHandlerContext三者的关系为:

Channel通道拥有一条ChannelPipeline通道流水线,每一个流水线节点为一个ChannelHandlerContext通道处理器上下文对象,每一个上下文中包裹了一个ChannelHandler通道处理器。在ChannelHandler通道处理器的入站/出站处理方法中,Netty都会传递一个Context上下文实例作为实际参数。通过Context实例的实参,在业务处理中,可以获取ChannelPipeline通道流水线的实例或者Channel通道的实例。

7.2 截断流水线的处理

在入站/出站的过程中,如果由于业务条件不满足,需要截断流水线的处理,不让处理进入下一站。

出站处理流程只要开始执行,就不能被截断。强行截断的话,Netty会抛出异常。如果业务条件不满足,可以不启动出站处理。

7.3 Handler业务处理器的热拔插

Netty中的处理器流水线是一个双向链表。在程序执行过程中,可以动态进行业务处理器的热拔插:动态地增加、删除流水线上的业务处理器Handler。主要的Handler热拔插方法声明在ChannelPipeline接口中

八、ByteBuf缓冲区

Netty提供了ByteBuf来替代Java NIO的ByteBuffer缓冲区,以操纵内存缓冲区。

8.1 优势

与Java NIO的ByteBuffer相比,ByteBuf的优势如下:

· Pooling (池化,这点减少了内存复制和GC,提升了效率)·

· 复合缓冲区类型,支持零复制· 不需要调用flip()方法去切换读/写模式

· 扩展性好,例如StringBuffer

· 可以自定义缓冲区类型

· 读取和写入索引分开

· 方法的链式调用

· 可以进行引用计数,方便重复使用

第一个部分是已用字节,表示已经使用完的废弃的无效字节;

第二部分是可读字节,这部分数据是ByteBuf保存的有效数据从ByteBuf中读取的数据都来自这一部分;

第三部分是可写字节,写入到ByteBuf的数据都会写到这一部分中;

第四部分是可扩容字节,表示的是该ByteBuf最多还能扩容的大小。

什么是Pooled(池化)的ByteBuf缓冲区呢?

在通信程序的执行过程中,Buffer缓冲区实例会被频繁创建、使用、释放。大家都知道,频繁创建对象、内存分配、释放内存,系统的开销大、性能低,如何提升性能、提高Buffer实例的使用率呢?从Netty4版本开始,新增了对象池化的机制。即创建一个Buffer对象池,将没有被引用的Buffer对象,放入对象缓存池中;当需要时,则重新从对象缓存池中取出,而不需要重新创建。

8.2 重要属性

这三个属性定义在AbstractByteBuf抽象类中,分别是:

· readerIndex(读指针):指示读取的起始位置

· writerIndex(写指针):指示写入的起始位置。

· maxCapacity(最大容量):表示ByteBuf可以扩容的最大容量。

8.3 三组方法


  • 第一组:容量系列:capacity()、maxCapacity()



  • 第二组:写入系列:isWritable() 、writableBytes()、maxWritableBytes()、writeBytes(byte[] src) ......



  • 第三组:读取系列:isReadable( )、readableBytes( )、readType()、getTYPE(TYPE value)

    ......


这里就不做详细介绍了感兴趣的可以去查看源码。

8.4 实践案例

实际使用分三步(1)分配一个ByteBuf实例;(2)向ByteBuf写数据;(3)从ByteBuf读数据。

参考github代码

8.5 引用计数

Netty的ByteBuf的内存回收工作是通过引用计数的方式管理的。JVM中使用“计数器”(一种GC算法)来标记对象是否“不可达”进而收回。Netty也使用了这种手段来对ByteBuf的引用进行计数。Netty采用“计数器”来追踪ByteBuf的生命周期,一是对Pooled ByteBuf的支持,二是能够尽快地“发现”那些可以回收的ByteBuf(非Pooled),以便提升ByteBuf的分配和销毁的效率。

九、EchoServer回显服务器的实践案例

功能:从服务器端读取客户端输入的数据,然后将数据直接回显到Console控制台。

public class NettyEchoServer{
private final int serverPort; ServerBootstrap b = new ServerBootstrap();
public NettyEchoServer(int port){ this.serverPort = port; }
public void runServer(){ //创建reactor 线程组 EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1); EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try { //1 设置reactor 线程组 b.group(bossLoopGroup, workerLoopGroup); //2 设置nio类型的channel b.channel(NioServerSocketChannel.class); //3 设置监听端口 b.localAddress(serverPort); //4 设置通道的参数 b.option(ChannelOption.SO_KEEPALIVE, true); b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 装配子通道流水线 b.childHandler(new ChannelInitializer<SocketChannel>() { //有连接到达时会创建一个channel protected void initChannel(SocketChannel ch) throws Exception{ // pipeline管理子通道channel中的Handler // 向子channel流水线添加一个handler处理器 ch.pipeline().addLast(NettyEchoServerHandler.INSTANCE); } }); // 6 开始绑定server // 通过调用sync同步方法阻塞直到绑定成功 ChannelFuture channelFuture = b.bind().sync(); Logger.info(" 服务器启动成功,监听端口: " + channelFuture.channel().localAddress());
// 7 等待通道关闭的异步任务结束 // 服务监听通道会一直等待通道关闭的异步任务结束 ChannelFuture closeFuture = channelFuture.channel().closeFuture(); closeFuture.sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 8 优雅关闭EventLoopGroup, // 释放掉所有资源包括创建的线程 workerLoopGroup.shutdownGracefully(); bossLoopGroup.shutdownGracefully(); }
}
public static void main(String[] args) throws InterruptedException{ int port = NettyDemoConfig.SOCKET_SERVER_PORT; new NettyEchoServer(port).runServer(); }}

9.1 共享NettyEchoServerHandler处理器

EchoServerHandler回显服务器处理器,继承自ChannelInboundHandlerAdapter,然后覆盖了channelRead方法,这个方法在可读IO事件到来时,被流水线回调。

第一步,从channelRead方法的msg参数。

第二步,调用ctx.channel().writeAndFlush() 把数据写回客户端。

@ChannelHandler.Sharablepublic class NettyEchoServerHandler extends ChannelInboundHandlerAdapter{ public static final NettyEchoServerHandler INSTANCE = new NettyEchoServerHandler(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{ ByteBuf in = (ByteBuf) msg; Logger.info("msg type: " + (in.hasArray() ? "堆内存" : "直接内存"));
int len = in.readableBytes(); byte[] arr = new byte[len]; in.getBytes(0, arr); Logger.info("server received: " + new String(arr, "UTF-8"));
//写回数据,异步任务 Logger.info("写回前,msg.refCnt:" + ((ByteBuf) msg).refCnt()); ChannelFuture f = ctx.writeAndFlush(msg); f.addListener((ChannelFuture futureListener) -> { Logger.info("写回后,msg.refCnt:" + ((ByteBuf) msg).refCnt()); }); }}

这里的NettyEchoServerHandler在前面加了一个特殊的Netty注解:@ChannelHandler.Sharable。这个注解的作用是标注一个Handler实例可以被多个通道安全地共享,如果没有加注解,试图将同一个Handler实例添加到多个ChannelPipeline通道流水线时,Netty将会抛出异常。

9.2 NettyEchoClient客户端代码

客户端Bootstrap的装配和使用,代码如下:

public class NettyEchoClient{
private int serverPort; private String serverIp; Bootstrap b = new Bootstrap();
public NettyEchoClient(String ip, int port) { this.serverPort = port; this.serverIp = ip; }
public void runClient() { //创建reactor 线程组 EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try { //1 设置reactor 线程组 b.group(workerLoopGroup); //2 设置nio类型的channel b.channel(NioSocketChannel.class); //3 设置监听端口 b.remoteAddress(serverIp, serverPort); //4 设置通道的参数 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 装配子通道流水线 b.handler(new ChannelInitializer<SocketChannel>() { //有连接到达时会创建一个channel protected void initChannel(SocketChannel ch) throws Exception { // pipeline管理子通道channel中的Handler // 向子channel流水线添加一个handler处理器 ch.pipeline().addLast(NettyEchoClientHandler.INSTANCE); } }); ChannelFuture f = b.connect(); f.addListener((ChannelFuture futureListener) -> { if (futureListener.isSuccess()) { Logger.info("EchoClient客户端连接成功!");
} else { Logger.info("EchoClient客户端连接失败!"); }
});
// 阻塞,直到连接完成 f.sync(); Channel channel = f.channel();
Scanner scanner = new Scanner(System.in); Print.tcfo("请输入发送内容:");
while (scanner.hasNext()) { //获取输入的内容 String next = scanner.next(); byte[] bytes = (Dateutil.getNow() + " >>" + next).getBytes("UTF-8"); //发送ByteBuf ByteBuf buffer = channel.alloc().buffer(); buffer.writeBytes(bytes); channel.writeAndFlush(buffer); Print.tcfo("请输入发送内容:");
} } catch (Exception e) { e.printStackTrace(); } finally { // 优雅关闭EventLoopGroup, // 释放掉所有资源包括创建的线程 workerLoopGroup.shutdownGracefully(); }
}
public static void main(String[] args) throws InterruptedException { int port = NettyDemoConfig.SOCKET_SERVER_PORT; String ip = NettyDemoConfig.SOCKET_SERVER_IP; new NettyEchoClient(ip, port).runClient(); }}

在上面的代码中,客户端在连接到服务器端成功后不断循环,获取控制台的输入,通过服务器端的通道发送到服务器。

9.3 NettyEchoClientHandler处理器

客户端的流水线不是空的,还需要装配一个回显处理器,功能很简单,就是接收服务器写过来的数据包,显示在Console控制台上。代码如下:

public class NettyDumpSendClient{
private int serverPort; private String serverIp; Bootstrap b = new Bootstrap();
public NettyDumpSendClient(String ip, int port){ this.serverPort = port; this.serverIp = ip; }
public void runClient(){ //创建reactor 线程组 EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try { //1 设置reactor 线程组 b.group(workerLoopGroup); //2 设置nio类型的channel b.channel(NioSocketChannel.class); //3 设置监听端口 b.remoteAddress(serverIp, serverPort); //4 设置通道的参数 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 装配子通道流水线 b.handler(new ChannelInitializer<SocketChannel>() { //有连接到达时会创建一个channel protected void initChannel(SocketChannel ch) throws Exception{ // pipeline管理子通道channel中的Handler // 向子channel流水线添加一个handler处理器 ch.pipeline().addLast(NettyEchoClientHandler.INSTANCE); } }); ChannelFuture f = b.connect(); f.addListener((ChannelFuture futureListener) -> { if (futureListener.isSuccess()) { Logger.info("EchoClient客户端连接成功!");
} else { Logger.info("EchoClient客户端连接失败!"); }
});
// 阻塞,直到连接完成 f.sync(); Channel channel = f.channel();
//6发送大量的文字 byte[] bytes = "疯狂创客圈:高性能学习社群!".getBytes(Charset.forName("utf-8")); for (int i = 0; i < 1000; i++) { //发送ByteBuf ByteBuf buffer = channel.alloc().buffer(); buffer.writeBytes(bytes); channel.writeAndFlush(buffer); }

// 7 等待通道关闭的异步任务结束 // 服务监听通道会一直等待通道关闭的异步任务结束 ChannelFuture closeFuture = channel.closeFuture(); closeFuture.sync();
} catch (Exception e) { e.printStackTrace(); } finally { // 优雅关闭EventLoopGroup, // 释放掉所有资源包括创建的线程 workerLoopGroup.shutdownGracefully(); }
}
public static void main(String[] args) throws InterruptedException{ int port = NettyDemoConfig.SOCKET_SERVER_PORT; String ip = NettyDemoConfig.SOCKET_SERVER_IP; new NettyDumpSendClient(ip, port).runClient(); }

通过代码可以看到,从服务器端发送过来的ByteBuf,被手动方式强制释放掉了。当然,也可以使用前面介绍的自动释放方式来释放ByteBuf。

结束语

    通过这几天的学习,收获还是不少了解了Netty基本原理:Reactor反应器模式在Netty中的应用,Netty中Reactor反应器、Handler业务处理器、Channel通道以及它们三者之间的相互关系。除此之外还学习了Pipeline流水线,他是Netty为了有效地管理通道和Handler业务处理器之间的关系。

这一节的内容还是挺多的,我自己读起来有时候也感觉晕晕的,但是其实读完把思路整理一下无外乎就几个主要的部分,反应器,通道,选择器,流水线等,建议跟着代码一起学习,一边debug一边理解他的执行流程和每个部分的作用,会更加直观简单。


我是星宇,一个满头黑发,渴望秃头的开发,我们下期见!

    






以上是关于Netty原理和基础的主要内容,如果未能解决你的问题,请参考以下文章

Netty基础必备知识,ByteBuffer和ByteBuf底层原理

Java高阶必备之Netty基础原理

Netty优雅退出机制和原理

精尽 Netty 原理与源码专栏( 已经完成 61+ 篇,预计总共 70+ 篇 )

画了 45 张图深度解析 Netty 架构与原理

Netty原理实践解析