Java NIO 篇

Posted Dream_it_possible!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java NIO 篇相关的知识,希望对你有一定的参考价值。

1. 什么是Java NIO?

     1)  Java 的NIO是基于 多路I/O 实现的,在多路I/O 复用的模型中有一个selector 的线程不断轮询并监听多个socket的状态,只有在socket有读写的时候,才会通知用户线程进行I/O 读写操作。

     2)  NIO Server 通过selector.select() 来获取查看通道里是否有时间到达,如果没有,那么用户线程一直会处于阻塞状态,多路I/O 复用只用了一个selector线程就能监听和管理多个socket 通道,因此I/O多路复用在连接数众多的时候有很大的优势。

           优点:  当响应体很小的时候,能够节约很多管理资源。

           缺点:  当响应题数据很大的时候,如果我们通过一个selector去轮询处理事件,那么一个线程的selector会成为性能的瓶颈。 在实际开发中,在多路复用的方法体内不做复杂的逻辑运算,一般只做接收信息和转发的作用,实际逻辑处理转交给后续的业务线程去处理。

     3)   客户端与服务端的交互是通过 buffer 和channel 实现的, 读写都是先通过buffer然后再进入到channel里进行数据传输。
     4)    NIO三大组件

buffer:  缓冲区。

channel:  通道。 

selector:   选择器,监听socketChannel, 一个socket对应一个channel, 一个selector可以监听并管理多个socket。

     5)   NIO 数据流向原理

 理解了NIO的原理后,可以尝试写一个简单的NIO server和NIO client。

2. 手写一个NIO框架

   2.1  手写NIO Server

package com.nio.demo.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

/**
 * @author bingbing
 * @date 2021/5/13 0013 18:55
 */
public class MyServer {


    private int size = 1024;

    private ServerSocketChannel serverSocketChannel;

    private ByteBuffer byteBuffer;

    private Selector selector;


    private int remoteClientNum = 0;

    private MyServer(int port) {
        try {
            initChannel(port);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void initChannel(int port) throws IOException {
        // 1. 打开信道 Channel
        serverSocketChannel = ServerSocketChannel.open();
        // 2. 设置为非阻塞模式。
        serverSocketChannel.configureBlocking(false);
        // 3. 绑定端口
        serverSocketChannel.bind(new InetSocketAddress(port));
        // 4. 创建选择器
        selector = Selector.open();
        // 5. 向选择器注册通道。
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        // 6. 分配缓存区的大小
        byteBuffer = ByteBuffer.allocate(size);
    }

    private void listen() throws Exception {
        // 设置监听器,用于监听channel里的数据变化
        System.out.println("开始监听请求...");
        while (true) {
            // n 表示有多少个channel准备好
            int n = selector.select();
            if (n == 0) {
                continue;
            }
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                // 遍历selectionKey
                SelectionKey key = iterator.next();
                if (key.isAcceptable()) {
                    // 获取Channel
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    // 获取连接
                    SocketChannel channel = server.accept();
                    System.out.println("收到一个新的连接...");
                    // Channel 注册
                    registerChannel(selector, channel, SelectionKey.OP_READ);
                    // 客户端数+1
                    remoteClientNum++;
                    // 响应数据
                    write(channel, "hello !".getBytes());
                }

                if (key.isReadable()) {
                    read(key);
                }
                iterator.remove();
            }
        }
    }

    /**
     * 读取数据
     *
     * @param selectionKey
     */
    private void read(SelectionKey selectionKey) throws IOException {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        int count;
        byteBuffer.clear();
        // 从缓冲区里读取数据
        System.out.println("client:");
        while ((count = socketChannel.read(byteBuffer)) > 0) {
            byteBuffer.flip();
            while (byteBuffer.hasRemaining()) {
                System.out.print((char) byteBuffer.get());
            }
            byteBuffer.clear();
        }
        if (count < 0) {
            socketChannel.close();
        }

    }

    private void write(SocketChannel socketChannel, byte[] data) {
        // 往缓存区里写数据
        byteBuffer.clear();
        byteBuffer.put(data);
        byteBuffer.flip();
        // 将缓存区的数据写入到管道
        try {
            socketChannel.write(byteBuffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * 绑定selector
     *
     * @param selector
     * @param socketChannel
     * @param op
     */
    private void registerChannel(Selector selector, SocketChannel socketChannel, int op) {
        // 将信道与selector绑定
        if (socketChannel == null) {
            return;
        }
        try {
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, op);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        MyServer server = new MyServer(8080);
        try {
            server.listen();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

         解析:   Server 端通过selector.select()方法监听 socketChannel, 如果有事件从channel过来,那么通过socketChannel.write(byteBuffer) 响应客户端,  另外判断selectionKey是否为可读,如果可读那么读取客户端发过来的数据。

   2.2 手写NIO Client

package com.nio.demo.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * @author bingbing
 * @date 2021/5/13 0013 18:55
 */
public class MyClient {


    private int size = 1024;

    private ByteBuffer byteBuffer;

    private SocketChannel socketChannel;

    public void connectServer() throws Exception {
        socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
        byteBuffer = ByteBuffer.allocate(size);
        socketChannel.configureBlocking(false);
        receive();
    }

    private void receive() throws IOException {
        while (true) {
            byteBuffer.clear();
            int count;
            while ((count = socketChannel.read(byteBuffer)) > 0) {
                byteBuffer.flip();
                System.out.println("server:");
                while (byteBuffer.hasRemaining()) {
                    System.out.print((char) byteBuffer.get());
                }
                // 向服务器写数据
                sendToServer("say , hi".getBytes());
                byteBuffer.clear();
            }
        }
    }


    private void sendToServer(byte[] data) throws IOException {
        byteBuffer.clear();
        byteBuffer.put(data);
        byteBuffer.flip();
        socketChannel.write(byteBuffer);
    }


    public static void main(String[] args) {
        try {
            new MyClient().connectServer();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3. 高性能的NIO框架Netty

       Netty是一个基于IO多路复用的高性能的网络层框架, 是目前最流行的NIO框架, 分布式框架dubbo也集成了Netty, Netty 是Dubbo高并发的关键。

     3.1 NettyServer关键类、接口和方法学习

       基于Netty 4.1

       在学习nettyServer前,我们可以先了解一下一些关键类和接口的用法,有助于帮我们熟悉netty代码的执行流程。

     3.1.1 ServerSocketChannel接口

         他有4个实现类,实现了初始化socketChannel通道、提供信道的socket、绑定ip、读取消息等方法。其中常用的3个实现类分别是EpollServerSocketChannel, KQueueServerSoketChannel,NioserverSocketChannel 。

        为什么有这3个实现类?

        这3个实现类其实对应的是3中IO多路复用的实现方式。其中epoll是linux环境下独有的IO多路复用的实现方式,kqueue是unix和mac os系统上运行的IO多路复用的实现方式,NioServerSocketChannel是通过selector选择器实现的IO多路复用。

        

   3.1.2 ServerBootstrap类

            Netty服务端的启动根类,继承AbastractBootStrap, 主要作用初始服务端的信道Channel(在init(Channel channel)方法里实现)。

            在init()方法做server端需要的初始化动作,初始化信道Channel, 然后将EventLoopGroup 与ServerBootStrapAcceptor绑定用来监听socket。

  @Override
    void init(Channel channel) {
        setChannelOptions(channel, newOptionsArray(), logger);
        setAttributes(channel, newAttributesArray());

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

      3.1.3 group()   方法

 group(EventLoopGroup group)

 Specify the EventLoopGroup which is used for the parent (acceptor) and the child (client).

      官方解释:  指定一个EventLoopGroup, 用来给父亲或者孩子,其实都是Channel。父亲表示的是管道的输入,孩子表示管道的输出。

      通俗解释: 设置组, 指定一个事件循环组,组里成员分别是接收数据的成员和发送数据的Channel,分别称为 acceptor和client。

     3.1.4 ChannelInboundHandlerAdapter类

          重要方法: 

          1)  channelActive(ChannelHandlerContext ctx)  , 初始化ChannelHandlerContext实例, 可以通过该实例往pipeline里写数据。

  @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        context = ctx;
    }

        2)  channelRead(ChannelHandlerContex t, Object msg);  该类读取从管道留过来的数据。    

 @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        result = msg.toString();
        notify();
    }

     3)   在ChannelInboundHandlerAdapter的实现类里,一般需要实现Callable接口,该接口里的call方法在线程执行后被调用。

public synchronized Object call() throws Exception {
        context.writeAndFlush(this.invocation);
        wait();
        return result;
    }

    3.2 NettyClient关键类、接口和方法学习

     3.2.1  客户端启动类Bootstrap

        bootstrap类是client的启动类,通过bootstrap 初始化信道绑定pipeline.

        bootstrap里的重要方法

        1) connect(hostName,port) 方法, 此方法用来连接服务端。

        2) option() 方法,此方法来来自于抽象类AbstractBootstrap,用来设置响应参数,如 ChannelOption.TCP_NODELAY, true, 设置为无延迟的TCP响应。

以上是关于Java NIO 篇的主要内容,如果未能解决你的问题,请参考以下文章

Java NIO5:选择器1---理论篇

Java NIO5:选择器1---理论篇

Java.nio:最简洁的递归目录删除

java nio网络编程服务篇入门

从Netty到EPollSelectorImpl学习Java NIO

Java NIO 篇