Netty学习
Posted 程序dunk
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty学习相关的知识,希望对你有一定的参考价值。
源代码rpc案例代码在rpc分支下
Java IO到Netty
Netty是一个异步的、基于时间驱动的网络应用框架,它提供了异步的、事件驱动的网络应用程序框架和工具。Netty主要用以快速开发高性能、高可靠的网络服务器和客户端程序
NIO基础
non-blocking io 非阻塞IO
NIO的特点
- 一个线程可以处理多个通道,减少线程出创建的数量
- 读写非阻塞,节约资源,没有可写\\可读数据时,不会发生阻塞导致线程资源的浪费
NIO怎么实现的同步非阻塞
关键就是轮询器(Selector)的使用。轮询器(Selector)负责监视全部通道IO的状态,当其中任意一个或者多个通道具有可用的IO操作时,该轮询器会通过一个方法返回一个大于0的整数,该整数值就表示具体在那个通道上有可用的IO操作。服务器正是通过该轮询器完成单事件轮询机制,并实现了多路复用
Java BIO与NIO比较
BIO(传统IO)
BIO是一个同步并阻塞的IO模式,传统的 java.io 包,它基于流模型实现,提供了我们最熟知的一些 IO 功能,比如File抽象、输入输出流等。交互方式是同步、阻塞的方式,也就是说,在读取输入流或者写入输出流时,在读、写动作完成之前,线程会一直阻塞在那里,它们之间的调用是可靠的线性顺序
传统BIO模式下的服务器端包含一个接收器(Acceptor)该接收器负责监听每一个客户端的连接请求,并创建相对应的线程来处理该客户端请求,不过当客户端数量急剧增加时,对应服务器端数量也会按照1:1的比例同步增加,势必会占用Java虚拟机中的大量资源,当量变引起质变的时候就会导致系统能急剧下降(譬如:内存溢出、系统崩溃等),于是想到了降低服务器端线程数量(必须满足客户端数量的需求)来解决这个问题
伪异步IO模式
针对传统BIO模式在性能上的瓶颈问题,Java IO通信模型改进设计了一种伪异步IO模型,就是通过在服务端控制线程的数量来灵活有效地调配系统线程资源
此服务器端同样是由Acceptor接收器负责监听连接请求,与传统BIO(一个请求对应一个线程)不同的是,服务器端通过一个任务处理模块Task(主要是通过JDK的Runnable接口来实现)来处理这些客户端连接,Task负责将这些连接请求放入一个线程池(Thread Pool)来处理,这个线程池维护着最大数量为M的活跃线程组(通常客户端数量是远大于M的),再该模式下,由于服务器端负责创建和维护的线程数量可控,因此服务器端占用资源也是可控的,最大程度避免了因资源耗尽而导致的系统崩溃问题
但是该模式底层仍然使用的同步阻塞的BIO,所以无法从根本上解决问题
NIO(Non-blocking / New I/O)
NIO是一种同步非阻塞的IO模型,于 JDK1.4 中引入,对应 java.nio 包,提供了Channel(通道)、Selector(轮询器)、Buffer(缓冲区)等抽象。NIO中的N可以理解为Non-blocking,不单纯是 New。它支持面向缓冲的,基于通道的 I/O 操作方法。NIO提供了与传统BIO模型中的Socket和ServerSocket相对应的SocketChannel和ServerSocketChannel两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发
BIO和NIO对比
IO模型 | BIO | NIO |
---|---|---|
通信 | 面向流 | 面向缓冲 |
处理 | 阻塞IO | 非阻塞IO |
触发 | 无 | 选择器 |
AIO模式
Java AIO(Java Asynchronous IO)模式是在JDK1.7版本中对NIO模式的一种改进。AIO就是异步非阻塞
的IO方式。该模式利用了异步IO操作所基于的事件回调机制,实现了服务器后台操作的非阻塞功能,即服务器会在操作完成后通知相应线程进行后续工作
AIO相比于NIO改进
虽然NIO提供了非阻塞的方法,但本质上NIO的操作还是同步的(体现在Selector同步器上)。具体来讲,就是NIO的服务器线程是在IO操作准备好时得到通知的,接着就有这个线程自行进行IO操作,因此本质上是同步操作
AIO模式下没有轮询器,而是在服务端的IO操作完成后,再给线程发出通知(通过异步回调事件机制)。因此AIO模式不会阻塞的,回调操作是在等待IO操作完成后由系统自动触发
异步模型需要底层操作系统(Kernel)提供支持
- Windows系统通过IOCP实现了真正的异步IO
- Linux系统异步IO在2.6版本引入,但是其底层还是用多路服用模拟了异步IO,性能没有优势(Netty5引入了异步IO,被废弃了)
文件AIO
/**
* @author :zsy
* @date :Created 2021/11/23 19:55
* @description:
*/
@Slf4j
public class AioFileChannel
public static void main(String[] args)
try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ))
ByteBuffer buffer = ByteBuffer.allocate(16);
log.debug("read begin...");
// 参数1:ByteBuffer
// 参数2:读取的起始位置
// 参数3:附加
// 参数4:回调函数,以守护线程的形式回调
channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>()
@Override
public void completed(Integer result, ByteBuffer attachment)
log.debug("read completed...", result);
attachment.flip();
// System.out.println(Charset.defaultCharset().decode(attachment));
debugRead(buffer);
@Override
public void failed(Throwable exc, ByteBuffer attachment)
exc.printStackTrace();
);
log.debug("read end");
System.in.read();
catch (IOException e)
e.printStackTrace();
运行结果
网络通信AIO
public class Aioserver
public static void main(String[] args) throws IOException
AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.accept(null, new AcceptHandler(ssc));
System.in.read();
private static void closeChannel(AsynchronousSocketChannel sc)
try
System.out.printf("[%s] %s close\\n", Thread.currentThread().getName(), sc.getRemoteAddress());
sc.close();
catch (IOException e)
e.printStackTrace();
private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer>
private final AsynchronousSocketChannel sc;
public ReadHandler(AsynchronousSocketChannel sc)
this.sc = sc;
@Override
public void completed(Integer result, ByteBuffer attachment)
try
if (result == -1)
closeChannel(sc);
return;
System.out.printf("[%s] %s read\\n", Thread.currentThread().getName(), sc.getRemoteAddress());
attachment.flip();
System.out.println(Charset.defaultCharset().decode(attachment));
attachment.clear();
// 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
sc.read(attachment, attachment, this);
catch (IOException e)
e.printStackTrace();
@Override
public void failed(Throwable exc, ByteBuffer attachment)
closeChannel(sc);
exc.printStackTrace();
private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer>
private final AsynchronousSocketChannel sc;
private WriteHandler(AsynchronousSocketChannel sc)
this.sc = sc;
@Override
public void completed(Integer result, ByteBuffer attachment)
// 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
if (attachment.hasRemaining())
sc.write(attachment);
@Override
public void failed(Throwable exc, ByteBuffer attachment)
exc.printStackTrace();
closeChannel(sc);
private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object>
private final AsynchronousServerSocketChannel ssc;
public AcceptHandler(AsynchronousServerSocketChannel ssc)
this.ssc = ssc;
@Override
public void completed(AsynchronousSocketChannel sc, Object attachment)
try
System.out.printf("[%s] %s connected\\n", Thread.currentThread().getName(), sc.getRemoteAddress());
catch (IOException e)
e.printStackTrace();
ByteBuffer buffer = ByteBuffer.allocate(16);
// 读事件由 ReadHandler 处理
sc.read(buffer, buffer, new ReadHandler(sc));
// 写事件由 WriteHandler 处理
sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
// 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
ssc.accept(null, this);
@Override
public void failed(Throwable exc, Object attachment)
exc.printStackTrace();
BIO、NIO和AIO区别
BIO:一个连接一个线程,客户端有连接请求时服务端就需要启动一个线程进行处理。线程开销大
伪异步IO:将请求连接放入线程池,一对多,但是线程资源依然有限
NIO:一个请求一个线程,但客户端发送的连接请求会注册到多路复用器上,多路复用器轮询到连接有IP请求是才启动一个线程进行处理
AIO:一个有效请求一个线程,客户端的IO请求都是由OS先完成了再通知服务器应用去启动线程进行处理
BIO是面向流的,NIO是面向缓冲区的;BIO的各种流是阻塞的。而NIO是非阻塞的;BIO的Stream是单向的,而NIO的channel是双向的。
NIO的特点:事件驱动模型、单线程处理多任务、非阻塞I/O,I/O读写不再阻塞,而是返回0、基于block的传输比基于流的传输更高效、更高级的IO函数zero-copy、IO多路复用大大提高了Java网络应用的可伸缩性和实用性。基于Reactor线程模型。
零拷贝
传统IO的问题
传统IO将一个文件通过socket写出的步骤
File f = new Flie("helloworld/data.txt");
RandomAccessFile flie = RandomAccessFile(f, "r");
byte[] buf = new byte[(int) f.length];
file.read(buf);
Socket socket = ...;
socket.getOutputStream().write(buf);
工作过程
-
Java本身不具备IO读写能力,因此read方法调用后,要从java程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用DMA(Direct Memory Access)来实现文件读,其间也不会使用cpu
DMA也可以理解为硬件单元,用来解放cpu完成文件IO
-
从内核态切换回用户态,将数据从内缓冲区读入用户缓冲区(即byte[] buf),这期间cpu会参与拷贝,无法利用DMA
-
调用write方法,这时将数据从用户缓冲区(byte[] buf)写入socket缓冲区,cpu会参与拷贝
-
接下来要向网卡写数据,这项能力java又不具备,因此又需要从用户态切换这内核态,调用操作系统的写能力,使用DMA将socket缓冲区的数据写入网卡,不会使用cpu
可以看到中间环节较多,java的IO实际不是物理设备级别的读写,而是缓存的复制,底层真正读写是操作系统来完成的
- 用户态与内核态切换发生了3次,这个操作比较耗费资源
- 数据拷贝了4次
NIO的优化
- ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存
- ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存
java可以使用DirectByteBuffer将堆外内存映射到JVM内存中来直接访问使用
- 这块内存不收JVM垃圾回收的影响,因此内存地址固定,有助于IO读写
- Java中的DirectByteBuffer对象仅维护了此内存的虚引用,内存回收分成两部分
- DirectByteBuffer 对象被垃圾回收,将虚引用加入引用队列
- 通过专门线程访问引用队列,根据虚引用释放堆外内存
- 减少了一次数据拷贝,用户态与内核态的切换次数没有减少
sendFile
进一步优化(底层采用了linux2.1后提供的sendFile方法),Java中对应两个channel调用transferTo/transferFrom方法拷贝数据
- Java调用transferTo方法后,要从Java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用cpu
- 数据从内核缓冲传输到socket缓冲区,cpu会参与拷贝
- 最后使用DMA将socket缓冲区的数据写入网卡,不会使用cpu
可以看到
- 只发生了一次用户态内核态的切换
- 数据拷贝了3次
linux2.4后
- java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
- 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
- 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu
整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 jvm 内存中,零拷贝的优点有
- 更少的用户态与内核态的切换
- 不利用 cpu 计算,减少 cpu 缓存伪共享
- 零拷贝适合小文件传输
三大组件
Channel(通道)
传统IO操作对read()或write()方法的调用,可能会因为没有数据可读/可写而阻塞,直到有数据响应。也就是说读写数据的IO调用,可能会无限期的阻塞等待,效率依赖网络传输的速度。最重要的是在调用一个方法前,无法知道是否会被阻塞。
NIO的Channel抽象了一个重要特征就是可以通过配置它的阻塞行为,来实现非阻塞式的通道。
Channel是一个双向通道,与传统IO操作只允许单向的读写不同的是,NIO的Channel允许在一个通道上进行读和写的操作。
主要实现
- FileChannel:文件数据传输通道
- SocketChannel:TCP网络编程数据传输通道服务器端和客户端
- ServerSocketChannel:TCP网络编程数据传输通道服务器端
- DatagramChannel:UDP网络编程数据传输通道
Buffer(缓冲区)
buffer顾名思义,他是一个缓冲区,实际上是一个容器,一个连续数组,Channel提供从文件、网络读取数据的渠道,但是读写的数据都必须经过Buffer
常见的 buffer 有
- ByteBuffer
- MappedByteBuffer
- DirectByteBuffer
- HeapByteBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
- CharBuffer
Buffer缓冲区本质是一块可以写入数据,然后可以从中读取数据的内存这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该模块内存。为了理解Buffer的工作原理,需要熟悉它的三个属性:capacity、position和limit。
Selector(多路复用器)
Selector与Channel是相互配合使用的,将Channel注册在Selector上之后,才可以正确的使用Selector,但此时Channel必须为非阻塞模式,Selector可以监听Channel的四种状态(Connect、Accept、Read、Write),当监听到某一个Channel的某个状态时,才允许对Channel进行相应的操作,不会让线程吊死在一个channel上。适合连接数特别多,但流量低的场景(low traffic)
- Connect:某一个客户端连接成功后
- Accept:准备好进行连接
- Read:可读
- Write:可写
调用 selector 的 select() 会阻塞直到 channel 发生了读写就绪事件,这些事件发生,select 方法就会返回这些事件交给 thread 来处理
ByteBuffer
最佳实践
在类路径下创建一个data.txt文件
使用byteBuffer读取文件数据
/**
* @author :zsy
* @date :Created 2021/11/19 20:33
* @description:测试ByteBuffer
*/
@Slf4j
public class TestByteBuffer
public static void main(String[] args)
// FileChannel
// 通过输入输出流获取文件 或者 RandomAccessFile
// try-with-resource语法无需自己写代码关闭资源,资源必须实现AutoClosable接口,重写close方法
//原理:编译器自动帮我们生成了finally块,并且在里面调用了资源的close方法
try (FileChannel channel = new FileInputStream("data.txt").getChannel())
// 准备缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10);
while (true)
// 从channel中读数据,向buffer写入数据
int len = channel.read(buffer);
log.debug("读到字节数:", len);
if (len == -1) break;
// 切换至读模式
buffer.flip();
// 打印buffer中的内容
while (buffer.hasRemaining()) // 还有剩余未读数据
byte b = buffer.get();
System.out.print((char) b);
// 切换至写模式
buffer.clear();
catch (IOException e)
;
twr
twr(try-with-resources):如果在try语句块同时打开了多个资源,那么在finally语句块中为了关闭所有的资源,不得不借助finally中嵌套finally的方式关闭所有的资源,如下图
public class Demo
public static void main(String[] args)
BufferedInputStream bin = null;
BufferedOutputStream bout = null;
try
bin = new BufferedInputStream(new FileInputStream(new File("test.txt")));
bout = new BufferedOutputStream(new FileOutputStream(new File("out.txt")));
int b;
while ((b = bin.read()) != -1)
bout.write(b);
catch (IOException e)
e.printStackTrace();
finally
if (bin != null)
try
bin.close();
catch (IOException e)
throw e;
finally
if (bout != null)
try
bout.close();
catch (IOException e)
throw e;
关闭资源的代码比业务代码还多,这是因为,我们不仅需要关闭 BufferedInputStream
,还需要保证如果关闭 BufferedInputStream
时出现了异常, BufferedOutputStream
也要能被正确地关闭。所以我们不得不借助finally中嵌套finally
JDK1.7中引入了try-with-resources语法来打开资源,无需程序员自己关闭资源
RandomAccessFile
/**
* @author :zsy
* @date :Created 2021/11/19 20:51
* @description:
*/
@Slf4j
public class TestByteBuffer0
public static void main(String[] args)
try (RandomAccessFile file = new RandomAccessFile("data.txt", "rw"))
FileChannel channel = file.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(10);
while (true)
int len = channel.read(buffer);
buffer.flip();
log.debug("读取到的字节数:", len);
if (len == -1) break;
while (buffer.hasRemaining())
byte b = buffer.get()以上是关于Netty学习的主要内容,如果未能解决你的问题,请参考以下文章