Netty原理和基础
Posted 程序员星宇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty原理和基础相关的知识,希望对你有一定的参考价值。
《高并发实战》之Netty基础和原理
前言
这周末去参加了个音乐节玩的比较嗨,文章也没做整理,加上最近阿里云主机被攻击,搞得很烦,这几天在处理云主机的问题,后续也会把遇到的木马、挖矿程序整理出来。这篇接着上一篇,对netty基础和原理做个收尾。
交易担保 微信读书 星宇送你无限卡,百万好书免费读 Mini Program
五、详解Channel通道
通过上面的学习,通道是Netty网络中最重要的核心,代表着网络连接。通道是通信的主题,负责同对端进行网络通信,可以写入数据到对端,也可以从对端读取数据。所有有必要堆通道做个整理。
5.1 构造函数
protected AbstractChannel(Channel parent) {
this.parent = parent;//父通道
this.unsafe = this.newUnsafe();//底层的NIO通道,完成实际的NIO操作
this.pipeline = new DefaultChannelPipeline(this);//一个通道一个流水线
}
从构造函数中我们可以明显看出AbstractChannel内部有一个pipeline属性,表示处理器的流水线。初始化通道时,pipeline属性会初始化成DefaultChannelPipeline实例。
其中parent属性表示通道的父通道。对于连接监听通道来说(如NioserverSocketChannel实例),他的父通道就是null,而对于每一条传输通道(如NioSocketChannel实例)他的父通道就是接收到该连接的服务连接监听通道。
几乎所有的通道实现类都继承了AbstractChannel抽象类,都拥有上面的parent和pipeline两个属性成员。
5.2 成员方法
看完构造函数,继续 了解他的成员方法。
ChannelFuture connect(SocketAddress address)
ChannelFuture bind(SocketAddress address)
ChannelFuture close()
关闭通道连接,返回连接关闭的ChannelFuture异步任务。如果需要在连接正式关闭后执行其他操作,则需要为异步任务设置回调方法;或者调用ChannelFuture异步任务的sync( ) 方法来阻塞当前线程,一直等到通道关闭的异步任务执行完毕。
Channel read()
读取通道数据,并且启动入站处理。具体来说,从内部的JavaNIO Channel通道读取数据,然后启动内部的Pipeline流水线,开启数据读取的入站处理。此方法的返回通道自身用于链式调用。
ChannelFuture write(Object o)
启程出站流水处理,把处理后的最终数据写到底层Java NIO通道。此方法的返回值为出站处理的异步处理任务。
Channel flush()
缓冲区中的数据立即写出到对端。并不是每一次write操作都是将数据直接写出到对端,write操作的作用在大部分情况下仅仅是写入到操作系统的缓冲区,操作系统会将根据缓冲区的情况,决定什么时候把数据写到对端。而执行flush()方法立即将缓冲区的数据写到对端。
5.3 嵌入式通道EmbeddedChannel
这是一个模拟入站和出站的通道,主要用于测试。
六、详解Handler业务处理器
Reactor反应器获取到IO事件后,分发到Handler业务处理器,由handler完成IO操作和业务处理。
整个IO处理的流程包括:通道读数据、数据包解码、业务处理、目标数据编码、数据包写入通道。
黑体字的四个流程是Netty底层负责完成,我们要做的就是业务处理。
前面已经介绍过,从应用程序开发人员的角度来看,有入站和出站两种类型操作。
· 入站处理,触发的方向为:自底向上,Netty的内部(如通道)到ChannelInboundHandler入站处理器。
· 出站处理,触发的方向为:自顶向下,从ChannelOutboundHandler出站处理器到Netty的内部(如通道)。按照这种方向来分,前面数据包解码、业务处理两个环节——属于入站处理器的工作;后面目标数据编码、把数据包写到通道中两个环节——属于出站处理器的工作。
6.1 入站处理器
到数据进入Netty通道后,netty就会触发入站处理器,这里我们就学习一下ChannelInboundHandler。
channelRegistered
当通道注册完后,Netty会调用fireChannelRegistered,触发通道注册事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelRegistered方法,会被调用到。
channelActive
当通道激活完成后,Netty会调用fireChannelActive,触发通道激活事件
channelRead
当通道缓冲区可读,Netty会调用fireChannelRead,触发通道可读事件
channelReadComplete
当通道缓冲区读完,Netty会调用fireChannelReadComplete,触发通道读完事件
channelInactive
当连接被断开或者不可用,Netty会调用fireChannelInactive,触发连接不可用事件
exceptionCaught
通道处理过程发生异常时,Netty会调用fireExceptionCaught,触发异常捕获事件
6.2 出站处理器
当业务处理完成后,需要操作Java NIO底层通道时,通过一系列的ChannelOutboundHandler通道出站处理器,完成Netty通道到底层通道的操作。比方说建立底层连接、断开底层连接、写入底层Java NIO通道等。ChannelOutboundHandler接口定义了大部分的出站操作
bind
connect
连接服务端:完成底层Java IO通道的服务器端的连接操作。如果使用TCP传输协议,这个方法用于客户端。
write
写数据到底层:完成Netty通道向底层Java IO通道的数据写入操作。此方法仅仅是触发一下操作而已,并不是完成实际的数据写入操作。
flush
腾空缓冲区中的数据,把这些数据写到对端:将底层缓存区的数据腾空,立即写出到对端。
read
从底层读数据:完成Netty通道从Java IO通道的数据读取。
disConnect
断开服务器连接
close
主动关闭通道:关闭底层的通道,例如服务器端的新连接监听通道
6.3 通道初始化处理器
通道和Handler业务处理器的关系是:一条Netty的通道拥有一条Handler业务处理器流水线,负责装配自己的Handler业务处理器。装配Handler的工作,发生在通道开始工作之前。
但是如果向流水线中装配业务处理器呢?这就得借助通道的初始化类——ChannelInitializer。
回顾一下代码
//5 装配子通道流水线
b.childHandler(new ChannelInitializer<SocketChannel>()
{
//有连接到达时会创建一个channel
protected void initChannel(SocketChannel ch) throws Exception
{
// pipeline管理子通道channel中的Handler
// 向子channel流水线添加一个handler处理器
ch.pipeline().addLast(new NettyDiscardHandler());
}
});
这里面使用了ChannelInitializer。
initChannel()方法是ChannelInitializer定义的一个抽象方法,在父通道调用initChannel()方法时,会将新接收的通道作为参数,传递给initChannel()方法。initChannel()方法内部大致的业务代码是:拿到新连接通道作为实际参数,往它的流水线中装配Handler业务处理器。
其他的方法就不介绍了类似于前面的两种处理器,感兴趣的可以自己去我的git上看。
七、详解Pipeline流水线
前面讲到,一条Netty通道需要很多的Handler业务处理器来处理业务。每条通道内部都有一条流水线(Pipeline)将Handler装配起来。Netty的业务处理器流水线ChannelPipeline是基于责任链设计模式(Chain of Responsibility)来设计的,内部是一个双向链表结构,能够支持动态地添加和删除Handler业务处理器。
关于他的流程感兴趣的可以去我的git上看看,这里就不做详细介绍了。
7.1 ChannelHandlerContext上下文
不管我们定义的是哪种类型的Handler业务处理器,最终它们都是以双向链表的方式保存在流水线中。
在Handler业务处理器被添加到流水线中时,会创建一个通道处理器上下文ChannelHandlerContext,它代表了ChannelHandler通道处理器和ChannelPipeline通道流水线之间的关联。
Channel、Handler、ChannelHandlerContext三者的关系为:
Channel通道拥有一条ChannelPipeline通道流水线,每一个流水线节点为一个ChannelHandlerContext通道处理器上下文对象,每一个上下文中包裹了一个ChannelHandler通道处理器。在ChannelHandler通道处理器的入站/出站处理方法中,Netty都会传递一个Context上下文实例作为实际参数。通过Context实例的实参,在业务处理中,可以获取ChannelPipeline通道流水线的实例或者Channel通道的实例。
7.2 截断流水线的处理
在入站/出站的过程中,如果由于业务条件不满足,需要截断流水线的处理,不让处理进入下一站。
出站处理流程只要开始执行,就不能被截断。强行截断的话,Netty会抛出异常。如果业务条件不满足,可以不启动出站处理。
7.3 Handler业务处理器的热拔插
Netty中的处理器流水线是一个双向链表。在程序执行过程中,可以动态进行业务处理器的热拔插:动态地增加、删除流水线上的业务处理器Handler。主要的Handler热拔插方法声明在ChannelPipeline接口中
八、ByteBuf缓冲区
Netty提供了ByteBuf来替代Java NIO的ByteBuffer缓冲区,以操纵内存缓冲区。
8.1 优势
与Java NIO的ByteBuffer相比,ByteBuf的优势如下:
· Pooling (池化,这点减少了内存复制和GC,提升了效率)·
· 复合缓冲区类型,支持零复制· 不需要调用flip()方法去切换读/写模式
· 扩展性好,例如StringBuffer
· 可以自定义缓冲区类型
· 读取和写入索引分开
· 方法的链式调用
· 可以进行引用计数,方便重复使用
第一个部分是已用字节,表示已经使用完的废弃的无效字节;
第二部分是可读字节,这部分数据是ByteBuf保存的有效数据从ByteBuf中读取的数据都来自这一部分;
第三部分是可写字节,写入到ByteBuf的数据都会写到这一部分中;
第四部分是可扩容字节,表示的是该ByteBuf最多还能扩容的大小。
什么是Pooled(池化)的ByteBuf缓冲区呢?
在通信程序的执行过程中,Buffer缓冲区实例会被频繁创建、使用、释放。大家都知道,频繁创建对象、内存分配、释放内存,系统的开销大、性能低,如何提升性能、提高Buffer实例的使用率呢?从Netty4版本开始,新增了对象池化的机制。即创建一个Buffer对象池,将没有被引用的Buffer对象,放入对象缓存池中;当需要时,则重新从对象缓存池中取出,而不需要重新创建。
8.2 重要属性
这三个属性定义在AbstractByteBuf抽象类中,分别是:
· readerIndex(读指针):指示读取的起始位置
· writerIndex(写指针):指示写入的起始位置。
· maxCapacity(最大容量):表示ByteBuf可以扩容的最大容量。
8.3 三组方法
第一组:容量系列:capacity()、maxCapacity()
第二组:写入系列:isWritable() 、writableBytes()、maxWritableBytes()、writeBytes(byte[] src) ......
第三组:读取系列:isReadable( )、readableBytes( )、readType()、getTYPE(TYPE value)
......
这里就不做详细介绍了感兴趣的可以去查看源码。
8.4 实践案例
实际使用分三步(1)分配一个ByteBuf实例;(2)向ByteBuf写数据;(3)从ByteBuf读数据。
参考github代码
8.5 引用计数
Netty的ByteBuf的内存回收工作是通过引用计数的方式管理的。JVM中使用“计数器”(一种GC算法)来标记对象是否“不可达”进而收回。Netty也使用了这种手段来对ByteBuf的引用进行计数。Netty采用“计数器”来追踪ByteBuf的生命周期,一是对Pooled ByteBuf的支持,二是能够尽快地“发现”那些可以回收的ByteBuf(非Pooled),以便提升ByteBuf的分配和销毁的效率。
九、EchoServer回显服务器的实践案例
功能:从服务器端读取客户端输入的数据,然后将数据直接回显到Console控制台。
public class NettyEchoServer
{
private final int serverPort;
ServerBootstrap b = new ServerBootstrap();
public NettyEchoServer(int port)
{
this.serverPort = port;
}
public void runServer()
{
//创建reactor 线程组
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try
{
//1 设置reactor 线程组
b.group(bossLoopGroup, workerLoopGroup);
//2 设置nio类型的channel
b.channel(NioServerSocketChannel.class);
//3 设置监听端口
b.localAddress(serverPort);
//4 设置通道的参数
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 装配子通道流水线
b.childHandler(new ChannelInitializer<SocketChannel>()
{
//有连接到达时会创建一个channel
protected void initChannel(SocketChannel ch) throws Exception
{
// pipeline管理子通道channel中的Handler
// 向子channel流水线添加一个handler处理器
ch.pipeline().addLast(NettyEchoServerHandler.INSTANCE);
}
});
// 6 开始绑定server
// 通过调用sync同步方法阻塞直到绑定成功
ChannelFuture channelFuture = b.bind().sync();
Logger.info(" 服务器启动成功,监听端口: " +
channelFuture.channel().localAddress());
// 7 等待通道关闭的异步任务结束
// 服务监听通道会一直等待通道关闭的异步任务结束
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (Exception e)
{
e.printStackTrace();
} finally
{
// 8 优雅关闭EventLoopGroup,
// 释放掉所有资源包括创建的线程
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException
{
int port = NettyDemoConfig.SOCKET_SERVER_PORT;
new NettyEchoServer(port).runServer();
}
}
9.1 共享NettyEchoServerHandler处理器
EchoServerHandler回显服务器处理器,继承自ChannelInboundHandlerAdapter,然后覆盖了channelRead方法,这个方法在可读IO事件到来时,被流水线回调。
第一步,从channelRead方法的msg参数。
第二步,调用ctx.channel().writeAndFlush() 把数据写回客户端。
.Sharable
public class NettyEchoServerHandler extends ChannelInboundHandlerAdapter
{
public static final NettyEchoServerHandler INSTANCE = new NettyEchoServerHandler();
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
ByteBuf in = (ByteBuf) msg;
Logger.info("msg type: " + (in.hasArray() ? "堆内存" : "直接内存"));
int len = in.readableBytes();
byte[] arr = new byte[len];
in.getBytes(0, arr);
Logger.info("server received: " + new String(arr, "UTF-8"));
//写回数据,异步任务
Logger.info("写回前,msg.refCnt:" + ((ByteBuf) msg).refCnt());
ChannelFuture f = ctx.writeAndFlush(msg);
f.addListener((ChannelFuture futureListener) ->
{
Logger.info("写回后,msg.refCnt:" + ((ByteBuf) msg).refCnt());
});
}
}
这里的NettyEchoServerHandler在前面加了一个特殊的Netty注解:@ChannelHandler.Sharable。这个注解的作用是标注一个Handler实例可以被多个通道安全地共享,如果没有加注解,试图将同一个Handler实例添加到多个ChannelPipeline通道流水线时,Netty将会抛出异常。
9.2 NettyEchoClient客户端代码
客户端Bootstrap的装配和使用,代码如下:
public class NettyEchoClient
{
private int serverPort;
private String serverIp;
Bootstrap b = new Bootstrap();
public NettyEchoClient(String ip, int port)
{
this.serverPort = port;
this.serverIp = ip;
}
public void runClient()
{
//创建reactor 线程组
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try
{
//1 设置reactor 线程组
b.group(workerLoopGroup);
//2 设置nio类型的channel
b.channel(NioSocketChannel.class);
//3 设置监听端口
b.remoteAddress(serverIp, serverPort);
//4 设置通道的参数
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 装配子通道流水线
b.handler(new ChannelInitializer<SocketChannel>()
{
//有连接到达时会创建一个channel
protected void initChannel(SocketChannel ch) throws Exception
{
// pipeline管理子通道channel中的Handler
// 向子channel流水线添加一个handler处理器
ch.pipeline().addLast(NettyEchoClientHandler.INSTANCE);
}
});
ChannelFuture f = b.connect();
f.addListener((ChannelFuture futureListener) ->
{
if (futureListener.isSuccess())
{
Logger.info("EchoClient客户端连接成功!");
} else
{
Logger.info("EchoClient客户端连接失败!");
}
});
// 阻塞,直到连接完成
f.sync();
Channel channel = f.channel();
Scanner scanner = new Scanner(System.in);
Print.tcfo("请输入发送内容:");
while (scanner.hasNext())
{
//获取输入的内容
String next = scanner.next();
byte[] bytes = (Dateutil.getNow() + " >>" + next).getBytes("UTF-8");
//发送ByteBuf
ByteBuf buffer = channel.alloc().buffer();
buffer.writeBytes(bytes);
channel.writeAndFlush(buffer);
Print.tcfo("请输入发送内容:");
}
} catch (Exception e)
{
e.printStackTrace();
} finally
{
// 优雅关闭EventLoopGroup,
// 释放掉所有资源包括创建的线程
workerLoopGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException
{
int port = NettyDemoConfig.SOCKET_SERVER_PORT;
String ip = NettyDemoConfig.SOCKET_SERVER_IP;
new NettyEchoClient(ip, port).runClient();
}
}
在上面的代码中,客户端在连接到服务器端成功后不断循环,获取控制台的输入,通过服务器端的通道发送到服务器。
9.3 NettyEchoClientHandler处理器
客户端的流水线不是空的,还需要装配一个回显处理器,功能很简单,就是接收服务器写过来的数据包,显示在Console控制台上。代码如下:
public class NettyDumpSendClient
{
private int serverPort;
private String serverIp;
Bootstrap b = new Bootstrap();
public NettyDumpSendClient(String ip, int port)
{
this.serverPort = port;
this.serverIp = ip;
}
public void runClient()
{
//创建reactor 线程组
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try
{
//1 设置reactor 线程组
b.group(workerLoopGroup);
//2 设置nio类型的channel
b.channel(NioSocketChannel.class);
//3 设置监听端口
b.remoteAddress(serverIp, serverPort);
//4 设置通道的参数
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 装配子通道流水线
b.handler(new ChannelInitializer<SocketChannel>()
{
//有连接到达时会创建一个channel
protected void initChannel(SocketChannel ch) throws Exception
{
// pipeline管理子通道channel中的Handler
// 向子channel流水线添加一个handler处理器
ch.pipeline().addLast(NettyEchoClientHandler.INSTANCE);
}
});
ChannelFuture f = b.connect();
f.addListener((ChannelFuture futureListener) ->
{
if (futureListener.isSuccess())
{
Logger.info("EchoClient客户端连接成功!");
} else
{
Logger.info("EchoClient客户端连接失败!");
}
});
// 阻塞,直到连接完成
f.sync();
Channel channel = f.channel();
//6发送大量的文字
byte[] bytes = "疯狂创客圈:高性能学习社群!".getBytes(Charset.forName("utf-8"));
for (int i = 0; i < 1000; i++)
{
//发送ByteBuf
ByteBuf buffer = channel.alloc().buffer();
buffer.writeBytes(bytes);
channel.writeAndFlush(buffer);
}
// 7 等待通道关闭的异步任务结束
// 服务监听通道会一直等待通道关闭的异步任务结束
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
} catch (Exception e)
{
e.printStackTrace();
} finally
{
// 优雅关闭EventLoopGroup,
// 释放掉所有资源包括创建的线程
workerLoopGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException
{
int port = NettyDemoConfig.SOCKET_SERVER_PORT;
String ip = NettyDemoConfig.SOCKET_SERVER_IP;
new NettyDumpSendClient(ip, port).runClient();
}
通过代码可以看到,从服务器端发送过来的ByteBuf,被手动方式强制释放掉了。当然,也可以使用前面介绍的自动释放方式来释放ByteBuf。
结束语
通过这几天的学习,收获还是不少了解了Netty基本原理:Reactor反应器模式在Netty中的应用,Netty中Reactor反应器、Handler业务处理器、Channel通道以及它们三者之间的相互关系。除此之外还学习了Pipeline流水线,他是Netty为了有效地管理通道和Handler业务处理器之间的关系。
这一节的内容还是挺多的,我自己读起来有时候也感觉晕晕的,但是其实读完把思路整理一下无外乎就几个主要的部分,反应器,通道,选择器,流水线等,建议跟着代码一起学习,一边debug一边理解他的执行流程和每个部分的作用,会更加直观简单。
我是星宇,一个满头黑发,渴望秃头的开发,我们下期见!
以上是关于Netty原理和基础的主要内容,如果未能解决你的问题,请参考以下文章
Netty基础必备知识,ByteBuffer和ByteBuf底层原理