Netty学习

Posted 程序dunk

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty学习相关的知识,希望对你有一定的参考价值。

源代码rpc案例代码在rpc分支下

Java IO到Netty

Netty是一个异步的、基于时间驱动的网络应用框架,它提供了异步的、事件驱动的网络应用程序框架和工具。Netty主要用以快速开发高性能、高可靠的网络服务器和客户端程序

NIO基础

non-blocking io 非阻塞IO

NIO的特点

  • 一个线程可以处理多个通道,减少线程出创建的数量
  • 读写非阻塞,节约资源,没有可写\\可读数据时,不会发生阻塞导致线程资源的浪费

NIO怎么实现的同步非阻塞

关键就是轮询器(Selector)的使用。轮询器(Selector)负责监视全部通道IO的状态,当其中任意一个或者多个通道具有可用的IO操作时,该轮询器会通过一个方法返回一个大于0的整数,该整数值就表示具体在那个通道上有可用的IO操作。服务器正是通过该轮询器完成单事件轮询机制,并实现了多路复用

IO与NIO

Java BIO与NIO比较

BIO(传统IO)

BIO是一个同步并阻塞的IO模式,传统的 java.io 包,它基于流模型实现,提供了我们最熟知的一些 IO 功能,比如File抽象、输入输出流等。交互方式是同步、阻塞的方式,也就是说,在读取输入流或者写入输出流时,在读、写动作完成之前,线程会一直阻塞在那里,它们之间的调用是可靠的线性顺序

传统BIO模式下的服务器端包含一个接收器(Acceptor)该接收器负责监听每一个客户端的连接请求,并创建相对应的线程来处理该客户端请求,不过当客户端数量急剧增加时,对应服务器端数量也会按照1:1的比例同步增加,势必会占用Java虚拟机中的大量资源,当量变引起质变的时候就会导致系统能急剧下降(譬如:内存溢出、系统崩溃等),于是想到了降低服务器端线程数量(必须满足客户端数量的需求)来解决这个问题

伪异步IO模式

针对传统BIO模式在性能上的瓶颈问题,Java IO通信模型改进设计了一种伪异步IO模型,就是通过在服务端控制线程的数量来灵活有效地调配系统线程资源

此服务器端同样是由Acceptor接收器负责监听连接请求,与传统BIO(一个请求对应一个线程)不同的是,服务器端通过一个任务处理模块Task(主要是通过JDK的Runnable接口来实现)来处理这些客户端连接,Task负责将这些连接请求放入一个线程池(Thread Pool)来处理,这个线程池维护着最大数量为M的活跃线程组(通常客户端数量是远大于M的),再该模式下,由于服务器端负责创建和维护的线程数量可控,因此服务器端占用资源也是可控的,最大程度避免了因资源耗尽而导致的系统崩溃问题

但是该模式底层仍然使用的同步阻塞的BIO,所以无法从根本上解决问题

NIO(Non-blocking / New I/O)

NIO是一种同步非阻塞的IO模型,于 JDK1.4 中引入,对应 java.nio 包,提供了Channel(通道)、Selector(轮询器)、Buffer(缓冲区)等抽象。NIO中的N可以理解为Non-blocking,不单纯是 New。它支持面向缓冲的,基于通道的 I/O 操作方法。NIO提供了与传统BIO模型中的Socket和ServerSocket相对应的SocketChannel和ServerSocketChannel两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发

BIO和NIO对比
IO模型BIONIO
通信面向流面向缓冲
处理阻塞IO非阻塞IO
触发选择器

AIO模式

Java AIO(Java Asynchronous IO)模式是在JDK1.7版本中对NIO模式的一种改进。AIO就是异步非阻塞的IO方式。该模式利用了异步IO操作所基于的事件回调机制,实现了服务器后台操作的非阻塞功能,即服务器会在操作完成后通知相应线程进行后续工作

AIO相比于NIO改进

虽然NIO提供了非阻塞的方法,但本质上NIO的操作还是同步的(体现在Selector同步器上)。具体来讲,就是NIO的服务器线程是在IO操作准备好时得到通知的,接着就有这个线程自行进行IO操作,因此本质上是同步操作

AIO模式下没有轮询器,而是在服务端的IO操作完成后,再给线程发出通知(通过异步回调事件机制)。因此AIO模式不会阻塞的,回调操作是在等待IO操作完成后由系统自动触发

异步模型需要底层操作系统(Kernel)提供支持

  • Windows系统通过IOCP实现了真正的异步IO
  • Linux系统异步IO在2.6版本引入,但是其底层还是用多路服用模拟了异步IO,性能没有优势(Netty5引入了异步IO,被废弃了)
文件AIO
/**
 * @author :zsy
 * @date :Created 2021/11/23 19:55
 * @description:
 */
@Slf4j
public class AioFileChannel 

    public static void main(String[] args) 
        try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ)) 

            ByteBuffer buffer = ByteBuffer.allocate(16);
            log.debug("read begin...");
            // 参数1:ByteBuffer
            // 参数2:读取的起始位置
            // 参数3:附加
            // 参数4:回调函数,以守护线程的形式回调
            channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() 
                @Override
                public void completed(Integer result, ByteBuffer attachment) 
                    log.debug("read completed...", result);
                    attachment.flip();
                    // System.out.println(Charset.defaultCharset().decode(attachment));
                    debugRead(buffer);
                

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) 
                    exc.printStackTrace();
                
            );
            log.debug("read end");
            System.in.read();
         catch (IOException e) 
            e.printStackTrace();
        
    

运行结果

网络通信AIO
public class Aioserver 
    public static void main(String[] args) throws IOException 
        AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8080));
        ssc.accept(null, new AcceptHandler(ssc));
        System.in.read();
    

    private static void closeChannel(AsynchronousSocketChannel sc) 
        try 
            System.out.printf("[%s] %s close\\n", Thread.currentThread().getName(), sc.getRemoteAddress());
            sc.close();
         catch (IOException e) 
            e.printStackTrace();
        
    

    private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> 
        private final AsynchronousSocketChannel sc;

        public ReadHandler(AsynchronousSocketChannel sc) 
            this.sc = sc;
        

        @Override
        public void completed(Integer result, ByteBuffer attachment) 
            try 
                if (result == -1) 
                    closeChannel(sc);
                    return;
                
                System.out.printf("[%s] %s read\\n", Thread.currentThread().getName(), sc.getRemoteAddress());
                attachment.flip();
                System.out.println(Charset.defaultCharset().decode(attachment));
                attachment.clear();
                // 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
                sc.read(attachment, attachment, this);
             catch (IOException e) 
                e.printStackTrace();
            
        

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) 
            closeChannel(sc);
            exc.printStackTrace();
        
    

    private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> 
        private final AsynchronousSocketChannel sc;

        private WriteHandler(AsynchronousSocketChannel sc) 
            this.sc = sc;
        

        @Override
        public void completed(Integer result, ByteBuffer attachment) 
            // 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
            if (attachment.hasRemaining()) 
                sc.write(attachment);
            
        

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) 
            exc.printStackTrace();
            closeChannel(sc);
        
    

    private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> 
        private final AsynchronousServerSocketChannel ssc;

        public AcceptHandler(AsynchronousServerSocketChannel ssc) 
            this.ssc = ssc;
        

        @Override
        public void completed(AsynchronousSocketChannel sc, Object attachment) 
            try 
                System.out.printf("[%s] %s connected\\n", Thread.currentThread().getName(), sc.getRemoteAddress());
             catch (IOException e) 
                e.printStackTrace();
            
            ByteBuffer buffer = ByteBuffer.allocate(16);
            // 读事件由 ReadHandler 处理
            sc.read(buffer, buffer, new ReadHandler(sc));
            // 写事件由 WriteHandler 处理
            sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
            // 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
            ssc.accept(null, this);
        

        @Override
        public void failed(Throwable exc, Object attachment) 
            exc.printStackTrace();
        
    

BIO、NIO和AIO区别

BIO:一个连接一个线程,客户端有连接请求时服务端就需要启动一个线程进行处理。线程开销大

伪异步IO:将请求连接放入线程池,一对多,但是线程资源依然有限

NIO:一个请求一个线程,但客户端发送的连接请求会注册到多路复用器上,多路复用器轮询到连接有IP请求是才启动一个线程进行处理

AIO:一个有效请求一个线程,客户端的IO请求都是由OS先完成了再通知服务器应用去启动线程进行处理

BIO是面向流的,NIO是面向缓冲区的;BIO的各种流是阻塞的。而NIO是非阻塞的;BIO的Stream是单向的,而NIO的channel是双向的。

NIO的特点:事件驱动模型、单线程处理多任务、非阻塞I/O,I/O读写不再阻塞,而是返回0、基于block的传输比基于流的传输更高效、更高级的IO函数zero-copy、IO多路复用大大提高了Java网络应用的可伸缩性和实用性。基于Reactor线程模型。

零拷贝

传统IO的问题

传统IO将一个文件通过socket写出的步骤

File f = new Flie("helloworld/data.txt");
RandomAccessFile flie = RandomAccessFile(f, "r");

byte[] buf = new byte[(int) f.length];
file.read(buf);

Socket socket = ...;
socket.getOutputStream().write(buf);

工作过程

  1. Java本身不具备IO读写能力,因此read方法调用后,要从java程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用DMA(Direct Memory Access)来实现文件读,其间也不会使用cpu

    DMA也可以理解为硬件单元,用来解放cpu完成文件IO

  2. 从内核态切换回用户态,将数据从内缓冲区读入用户缓冲区(即byte[] buf),这期间cpu会参与拷贝,无法利用DMA

  3. 调用write方法,这时将数据从用户缓冲区(byte[] buf)写入socket缓冲区,cpu会参与拷贝

  4. 接下来要向网卡写数据,这项能力java又不具备,因此又需要从用户态切换这内核态,调用操作系统的写能力,使用DMA将socket缓冲区的数据写入网卡,不会使用cpu

可以看到中间环节较多,java的IO实际不是物理设备级别的读写,而是缓存的复制,底层真正读写是操作系统来完成的

  • 用户态与内核态切换发生了3次,这个操作比较耗费资源
  • 数据拷贝了4次

NIO的优化

  • ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存
  • ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存

java可以使用DirectByteBuffer将堆外内存映射到JVM内存中来直接访问使用

  • 这块内存不收JVM垃圾回收的影响,因此内存地址固定,有助于IO读写
  • Java中的DirectByteBuffer对象仅维护了此内存的虚引用,内存回收分成两部分
    • DirectByteBuffer 对象被垃圾回收,将虚引用加入引用队列
    • 通过专门线程访问引用队列,根据虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

sendFile

进一步优化(底层采用了linux2.1后提供的sendFile方法),Java中对应两个channel调用transferTo/transferFrom方法拷贝数据

  • Java调用transferTo方法后,要从Java程序的用户态切换至内核态,使用DMA将数据读入内核缓冲区,不会使用cpu
  • 数据从内核缓冲传输到socket缓冲区,cpu会参与拷贝
  • 最后使用DMA将socket缓冲区的数据写入网卡,不会使用cpu

可以看到

  • 只发生了一次用户态内核态的切换
  • 数据拷贝了3次

linux2.4后

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  3. 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu

整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 jvm 内存中,零拷贝的优点有

  • 更少的用户态与内核态的切换
  • 不利用 cpu 计算,减少 cpu 缓存伪共享
  • 零拷贝适合小文件传输

三大组件

Channel(通道)

传统IO操作对read()或write()方法的调用,可能会因为没有数据可读/可写而阻塞,直到有数据响应。也就是说读写数据的IO调用,可能会无限期的阻塞等待,效率依赖网络传输的速度。最重要的是在调用一个方法前,无法知道是否会被阻塞。

NIO的Channel抽象了一个重要特征就是可以通过配置它的阻塞行为,来实现非阻塞式的通道。

Channel是一个双向通道,与传统IO操作只允许单向的读写不同的是,NIO的Channel允许在一个通道上进行读和写的操作。

主要实现

  • FileChannel:文件数据传输通道
  • SocketChannel:TCP网络编程数据传输通道服务器端和客户端
  • ServerSocketChannel:TCP网络编程数据传输通道服务器端
  • DatagramChannel:UDP网络编程数据传输通道

Buffer(缓冲区)

buffer顾名思义,他是一个缓冲区,实际上是一个容器,一个连续数组,Channel提供从文件、网络读取数据的渠道,但是读写的数据都必须经过Buffer

常见的 buffer 有

  • ByteBuffer
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

Buffer缓冲区本质是一块可以写入数据,然后可以从中读取数据的内存这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该模块内存。为了理解Buffer的工作原理,需要熟悉它的三个属性:capacity、position和limit。

Selector(多路复用器)

Selector与Channel是相互配合使用的,将Channel注册在Selector上之后,才可以正确的使用Selector,但此时Channel必须为非阻塞模式,Selector可以监听Channel的四种状态(Connect、Accept、Read、Write),当监听到某一个Channel的某个状态时,才允许对Channel进行相应的操作,不会让线程吊死在一个channel上。适合连接数特别多,但流量低的场景(low traffic)

  • Connect:某一个客户端连接成功后
  • Accept:准备好进行连接
  • Read:可读
  • Write:可写

调用 selector 的 select() 会阻塞直到 channel 发生了读写就绪事件,这些事件发生,select 方法就会返回这些事件交给 thread 来处理

ByteBuffer

最佳实践

在类路径下创建一个data.txt文件

使用byteBuffer读取文件数据

/**
 * @author :zsy
 * @date :Created 2021/11/19 20:33
 * @description:测试ByteBuffer
 */
@Slf4j
public class TestByteBuffer 
    public static void main(String[] args) 
        // FileChannel
        // 通过输入输出流获取文件 或者 RandomAccessFile
        // try-with-resource语法无需自己写代码关闭资源,资源必须实现AutoClosable接口,重写close方法
        //原理:编译器自动帮我们生成了finally块,并且在里面调用了资源的close方法
        try (FileChannel channel = new FileInputStream("data.txt").getChannel()) 
            // 准备缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(10);
            while (true) 
                // 从channel中读数据,向buffer写入数据
                int len = channel.read(buffer);
                log.debug("读到字节数:", len);
                if (len == -1) break;
                // 切换至读模式
                buffer.flip();
                // 打印buffer中的内容
                while (buffer.hasRemaining())  // 还有剩余未读数据
                    byte b = buffer.get();
                    System.out.print((char) b);
                
                // 切换至写模式
                buffer.clear();
            
         catch (IOException e) 
        
        ;
    

twr

twr(try-with-resources):如果在try语句块同时打开了多个资源,那么在finally语句块中为了关闭所有的资源,不得不借助finally中嵌套finally的方式关闭所有的资源,如下图

public class Demo 
  public static void main(String[] args) 
    BufferedInputStream bin = null;
    BufferedOutputStream bout = null;
    try 
      bin = new BufferedInputStream(new FileInputStream(new File("test.txt")));
      bout = new BufferedOutputStream(new FileOutputStream(new File("out.txt")));
      int b;
      while ((b = bin.read()) != -1) 
        bout.write(b);
      
    
    catch (IOException e) 
      e.printStackTrace();
    
    finally 
      if (bin != null) 
        try 
          bin.close();
        
        catch (IOException e) 
          throw e;
        
        finally 
          if (bout != null) 
            try 
              bout.close();
            
            catch (IOException e) 
              throw e;
            
          
        
      
    
  

关闭资源的代码比业务代码还多,这是因为,我们不仅需要关闭 BufferedInputStream,还需要保证如果关闭 BufferedInputStream时出现了异常, BufferedOutputStream也要能被正确地关闭。所以我们不得不借助finally中嵌套finally

JDK1.7中引入了try-with-resources语法来打开资源,无需程序员自己关闭资源

原理

RandomAccessFile
/**
 * @author :zsy
 * @date :Created 2021/11/19 20:51
 * @description:
 */
@Slf4j
public class TestByteBuffer0 

    public static void main(String[] args) 

        try (RandomAccessFile file = new RandomAccessFile("data.txt", "rw")) 
            FileChannel channel = file.getChannel();
            ByteBuffer buffer = ByteBuffer.allocate(10);
            while (true) 
                int len = channel.read(buffer);
                buffer.flip();
                log.debug("读取到的字节数:", len);
                if (len == -1) break;
                while (buffer.hasRemaining()) 
                    byte b = buffer.get()以上是关于Netty学习的主要内容,如果未能解决你的问题,请参考以下文章

灰常详细的Netty实现聊天室的代码解析

netty学习总结

Netty学习

RPC&Netty 学习之路

从Netty到EPollSelectorImpl学习Java NIO

品Netty源码,学习良好的编程习惯