Java NIO三组件——Selecotr/Channel实现原理解析

Posted 兴趣使然的草帽路飞

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java NIO三组件——Selecotr/Channel实现原理解析相关的知识,希望对你有一定的参考价值。

目前很多高性能的Java RPC框架都是基于Netty实现的,而Netty的设计原理又离不开Java NIO。本篇笔记是对NIO核心三件套:缓冲区(Buffer)、选择器 (Selector)和通道(Channel),其中后两者选择器与通道实现原理的学习总结。

一、NIO聊天室入门案例

在学习原理之前,先来了解一个Java NIO实现聊天室的小案例,该案例只有三个类:Nioserver 聊天室服务端、NioClient 聊天室客户端、ClientThread 客户端线程。

服务端代码:

/**
 * Nio聊天室服务端
 *
 * @author csp
 * @date 2021-11-30 4:13 下午
 */
public class NioServer 

    /**
     * 聊天室成员列表:
     */
    Map<String, SocketChannel> memberChannels;

    /**
     * 端口
     */
    private static final int PORT = 8080;

    /**
     * 选择器
     */
    private Selector selector;

    /**
     * 管道
     */
    private ServerSocketChannel server;

    /**
     * 缓冲
     */
    private ByteBuffer buffer;

    public NioServer() throws IOException 
        // 初始化Selector选择器
        this.selector = Selector.open();
        // 初始化Channel通道
        this.server = getServerChannel(selector);
        // 初始化Buffer缓冲:
        this.buffer = ByteBuffer.allocate(1024);
        // 初始化聊天室成员列表
        memberChannels = new ConcurrentHashMap<>();
    

    /**
     * 初始化Channel通道
     *
     * @param selector
     * @return
     * @throws IOException
     */
    private ServerSocketChannel getServerChannel(Selector selector) throws IOException 
        // 开辟一个Channel通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 通道设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        // 通道注册绑定Selector选择器,通道中数据的事件类型为OP_ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        // 通道绑定端口
        serverSocketChannel.socket().bind(new InetSocketAddress(PORT));

        return serverSocketChannel;
    

    /**
     * 事件监听
     */
    public void listen() throws IOException 
        System.out.println("服务端启动......");
        try 
            // 无限循环
            while (true) 
                // 作用:至少需要有一个事件发生,否则(如果count == 0)就继续阻塞循环
                int count = selector.select();
                if (count == 0) 
                    continue;
                
                // 获取SelectorKey的集合
                Set<SelectionKey> keySet = selector.selectedKeys();

                Iterator<SelectionKey> iterator = keySet.iterator();
                while (iterator.hasNext()) 
                    // 当前事件对应的SelectorKey
                    SelectionKey key = iterator.next();
                    // 删除当前事件:表示当前事件已经被消费了
                    iterator.remove();
                    if (key.isAcceptable()) 
                        // Accept类型事件:
                        // 通过key获取ServerSocketChannel
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        // 通过ServerSocketChannel获取SocketChannel
                        SocketChannel channel = server.accept();
                        // channel设置为非阻塞模式
                        channel.configureBlocking(false);
                        // channel绑定选择器
                        channel.register(selector, SelectionKey.OP_READ);

                        // 从channel中获取Host、端口等信息
                        System.out.println("客户端连接:"
                                + channel.socket().getInetAddress().getHostName() + ":"
                                + channel.socket().getPort());
                     else if (key.isReadable()) 
                        // Read类型事件
                        SocketChannel channel = (SocketChannel) key.channel();
                        // 用于解密消息内容
                        CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();

                        // 将消息数据从通道channel读取到缓冲buffer
                        //ByteBuffer buffer = ByteBuffer.allocate(50);
                        buffer.clear();
                        channel.read(buffer);
                        buffer.flip();
                        // 获取解密后的消息内容:
                        String msg = decoder.decode(buffer).toString();
                        if (!"".equals(msg)) 
                            System.out.println("收到:" + msg);
                            if (msg.startsWith("username=")) 
                                String username = msg.replaceAll("username=", "");
                                memberChannels.put(username, channel);
                                System.out.println("用户总数:" + memberChannels.size());
                             else 
                                // 转发消息给客户端
                                String[] arr = msg.split(":");
                                if (arr.length == 3) 
                                    // 发送者
                                    String from = arr[0];
                                    // 接收者
                                    String to = arr[1];
                                    // 发送内容
                                    String content = arr[2];
                                    System.out.println(from + "发送给" + to + "的消息:" + content);

                                    if (memberChannels.containsKey(to)) 
                                        // 解密
                                        CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
                                        // 给接收者发送消息
                                        memberChannels.get(to).write(encoder.encode(CharBuffer.wrap(from + ":" + content)));
                                    
                                
                            
                        

                    
                
            
        catch (Exception e)
            System.out.println("服务端启动失败......");
            e.printStackTrace();
        finally 
            try 
                // 先关闭选择器,在关闭通道
                // 调用 close() 方法将会关闭Selector,同时也会将关联的SelectionKey失效,但不会关闭Channel。
                selector.close();
                server.close();
             catch (IOException e) 
                e.printStackTrace();
            
        
    

    public static void main(String[] args) throws IOException 
        // 服务端启动:
        new NioServer().listen();
    

客户端线程类:

/**
 * Nio聊天室客户端线程
 *
 * @author csp
 * @date 2021-11-30 4:13 下午
 */
public class ClientThread extends Thread 
    /**
     * 解密
     */
    private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();

    /**
     * 加密
     */
    private CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();

    /**
     * 选择器
     */
    private Selector selector = null;

    /**
     * 通道
     */
    private SocketChannel socket = null;

    /**
     * 通道key
     */
    private SelectionKey clientKey = null;

    /**
     * 用户名
     */
    private String username;

    public ClientThread(String username) 
        try 
            // 创建一个Selector
            selector = Selector.open();

            // 创建Socket并注册
            socket = SocketChannel.open();
            socket.configureBlocking(false);
            clientKey = socket.register(selector, SelectionKey.OP_CONNECT);

            // 连接到远程地址
            InetSocketAddress ip = new InetSocketAddress("localhost", 8080);
            socket.connect(ip);

            this.username = username;
         catch (IOException e) 
            e.printStackTrace();
        
    

    /**
     * 开辟读取事件的线程
     */
    @Override
    public void run() 
        try 
            // 监听事件(无限循环)
            while (true) 
                // 监听事件
                selector.select();
                // 事件来源列表
                Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                while (it.hasNext()) 
                    SelectionKey key = it.next();
                    // 删除当前事件
                    it.remove();

                    // 判断事件类型
                    if (key.isConnectable()) 
                        // 连接事件
                        SocketChannel channel = (SocketChannel) key.channel();
                        if (channel.isConnectionPending())
                            channel.finishConnect();
                        channel.register(selector, SelectionKey.OP_READ);
                        System.out.println("连接服务器端成功!");

                        // 发送用户名
                        send("username=" + this.username);
                     else if (key.isReadable()) 
                        // 读取数据事件
                        SocketChannel channel = (SocketChannel) key.channel();

                        // 读取数据
                        ByteBuffer buffer = ByteBuffer.allocate(50);
                        channel.read(buffer);
                        buffer.flip();
                        String msg = decoder.decode(buffer).toString();
                        System.out.println("收到:" + msg);
                    
                
            
         catch (IOException e) 
            e.printStackTrace();
         finally 
            // 关闭
            try 
                selector.close();
                socket.close();
             catch (IOException e) 
            
        
    

    /**
     * 发送消息
     *
     * @param msg
     */
    public void send(String msg) 
        try 
            SocketChannel client = (SocketChannel) clientKey.channel();
            client.write(encoder.encode(CharBuffer.wrap(msg)));
         catch (Exception e) 
            e.printStackTrace();
        
    

    /**
     * 关闭客户端
     */
    public void close() 
        try 
            selector.close();
            socket.close();
         catch (IOException e) 
        
    

客户端代码:

/**
 * Nio聊天室客户端
 *
 * @author csp
 * @date 2021-12-09 17:03:33
 */
public class NioClient 
    public static void main(String[] args) 
        // 当前客户端的用户名
        String username = "lufei";
        // 为当前客户端开辟一个线程
        ClientThread client = new ClientThread(username);
        client.start();

        // 输入输出流
        BufferedReader sin = new BufferedReader(new InputStreamReader(System.in));

        try 
            // 循环读取键盘输入
            String readline;
            while ((readline = sin.readLine()) != null) 
                if (readline.equals("bye")) 
                    client.close();
                    System.exit(0);
                
                // 发送消息
                client.send(username + ":" + readline);
            
         catch (IOException e) 
            e.printStackTrace();
        
    

运行测试:

启动运行测试一下效果!

服务端先启动,控制台打印:

服务端启动......

接着启动客户端,控制台打印:

连接服务器端成功!

这时候服务端会打印客户端的连接信息以及用户名等信息:

测试客户端向服务的发送消息,客户端控制台输入Hello 我是lufei!,这时候服务端会收到发送过来的消息内容:

我们可以再建立一个客户端启动类NioClient2,并将其启动,服务端会收到客户端2的消息:

让客户端1和客户端2之间发送消息:

服务端控制台打印:

这样,一个简单的聊天室就搭建成功了,如果小伙伴想自行完善,可以把代码拷贝一下,自己去设计自己想要实现的聊天室功能。

熟悉了NIO通信的小案例之后,我们通过一张图来分析一下其实现原理:

从图中可以看出,当有读或写等任何注册的事件发生时,可以从 Selector中获得相应的SelectionKey,同时从SelectionKey中可以找到发生的事件和该事件所发生的具体的SelectableChannel,以获得客户端发送过来的数据。

二、Selector 选择器

1、Selector 继承体系

NIO中实现非阻塞 I/O的核心对象是Selector,Selector是注册各种I/O事件的地方,而且当那些事件发生时,就是Seleetor告诉我们所发生的事件。

使用NIO中非阻塞I/O编写服务器处理程序,大体上可以分为下面三个步骤:

  • (1) 向Selector对象注册感兴趣的事件。
  • (2) 从Selector中获取感兴趣的事件。
  • (3) 根据不同的事件进行相应的处理。

2、Selector 选择器的创建

在聊天室案例的NioServer服务端类中,选择器的初始化创建位于其构造函数中:

public NioServer() throws IOException 
    // 初始化Selector选择器
    // 也可以通过实现java.nio.channels.spi.SelectorProvider.openSelector()抽象方法自定义一个Selector。
    this.selector = Selector.open();
    // 初始化Channel通道
    // 初始化Buffer缓冲:
    // 初始化聊天室成员列表

Selector可以通过它自己的open()方法创建,借助java.nio.channels.spi.SelectorProvider类创建一个新的 Selector 选择器。也可以通过实现java.nio.channels.spi.SelectorProvider类的抽象方法openSelector()来自定义实现一个Selector。Selector 一旦创建将会一直处于 open 状态直到调用了close()方法为止。

我们跟进这个open()方法:

public abstract class Selector implements Closeable 
  
    protected Selector() 
  	
  	// 该方法返回一个Selector选择器:
    public static Selector open() throws IOException 
      	// 通过SelectorProvider构建选择器
        return SelectorProvider.provider().openSelector();
    
  	...

继续向下跟进SelectorProvider.provider()方法:

// 该方法位于SelectorProvider类中吗,可以获取能够构建Selector选择器的SelectorProvider对象
public static SelectorProvider provider() 
    synchronized (lock) 
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() 
                public SelectorProvider run() 
                        if (loadProviderFromProperty())
                            return provider;
                        if 以上是关于Java NIO三组件——Selecotr/Channel实现原理解析的主要内容,如果未能解决你的问题,请参考以下文章

Tomcat NIO线程模型深入分析

Java NIO系列教程 Java NIO 概述

Java NIO系列教程 Java NIO 概述

Java NIO系列教程 Java NIO 概述

JAVA NIO 之 Selector 组件

JAVA NIO buffer (知识三)