Netty框架之NIO多路复用选择器

Posted 木兮君

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty框架之NIO多路复用选择器相关的知识,希望对你有一定的参考价值。

前言

小编今天继续讲解NIO,上篇博文Netty框架之深入了解NIO核心组件中有画nio的简易模型,但是小编的代码示例中仍然是阻塞的,这是因为没有加入多路复用选择器Selector,其实只要将管道设置为非阻塞模式,然后注册至Selector。当消息到达后才去通知,这样就避免了线程的阻塞。接下来小编来探索Selector。

选择器工作模型


当Channel管道注册到Selector的时候,写入资源的阻塞就不会在Thread发生,selector会监听数据的写入,当数据完全写入的时候就可以异步通知线程,这样我们就可以实时读取。这样就比较快速,这就是同步非阻塞。
选择器是如何实现这个功能的?在解决这个问题之前先来了解一下选择器中的核心组件

选择器核心组件

选择器核心组件有三个:管道(SelectableChannel)、选择器(Selector)、选择键(SelectorKey)。并非所有的Channel都支持注册到选择器,只有SelectableChannel子类才可以。当管道注册到Selector后就会返回一个Key,通过它就可以获取到关联的管道。接下来就分别介绍三个组件的作用:

SelectableChannel
核心功能有两个,第一通过configureBlocking 设置阻塞模式,默认为true,向选择器注册前必须置为false。
第二就是是调用register方法注册到指定管道,并且指定要监听的事件。可选的事件有:CONNECT(建立连接)、ACCEPT(接受连接)、READ(可读)、WRITE(可写) 。但并非所有管道都支持这四事件,可通过validOps()来查看当前管道支持哪些事件。

注册并监听事件如以下代码所示


ServerSocketChannel socketChannel= ServerSocketChannel.open();
socketChannel.bind(new InetSocketAddress(8080));
//设置非阻塞模式
socketChannel.configureBlocking(false);
Selector selector = Selector.open();   
// 监听接受连接事件       
key=socketChannel.register(selector, SelectionKey.OP_ACCEPT);

ServerSocketChannel 仅支持OP_ACCEPT事件
SocketChannel 支持 OP_CONNECT、OP_READ、OP_WRITE 三个事件
查看管道支持哪些事件可通过 validOps() 反回的值 然后进行‘&’运算 判断如下代码所示

//表示该管道支持 OP_CONNECT 事件监听
socketChannel.validOps() & SelectionKey.OP_CONNECT != 0

Selector
管道注册至Selector之后 会生成一个键(SelectorKey) 该键维护在Selector的keys中。并通过调用select 方法进行刷新,如果返回数大于0表示有指定数量的键状态发生了变更。

  • select():有键更新,立马返回。否则会一直阻塞直到有键更新为止。
  • select(long ):有键更新,立马返回。否则会阻塞参数指定毫秒时。
  • selectNow():无论有没有键更新都会立马返回

如果有键更新,接下来就可以调用selectedKeys 获取更新的键集了。

SelectionKey
选择键用于关联管道与选择器,并监听维护管道1至多个事件,监听事件可在注册时指定,也可后续通过调用SelectionKey.interestOps 来改变。

// 同时监听读写事件
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);

此外SelectionKey还有如下主要功能:

  • channel() 获取管道
  • 判断状态:
    • isAcceptable() 管道是否处于Accept状态
    • isConnectable 管道是否处于连接就绪状态
    • isReadable 管道是否处于读取就绪状态
    • isWritable 管道是否处于写就续状态
  • isValid() 判断该键是否有效,管道关闭、选择器关闭、键调用cancel()方法都会导致该键无效
  • cancel()取消管道注册(不会直接关闭管道)

简单代码示例及说明

以下是一个UDP的例子,通过 DatagramChannel 开放管道,并注册至Selector。然后由Selector代为监听消息的状态。

	@Test
    public void udpChannelTest() throws IOException 
        DatagramChannel datagramChannel = DatagramChannel.open();
        datagramChannel.bind(new InetSocketAddress(8080));
        //设置非阻塞
        datagramChannel.configureBlocking(false);
        Selector selector = Selector.open();
        // 注册读取就续事件
        SelectionKey register = datagramChannel.register(selector, SelectionKey.OP_READ);
        while (true) 
            int count = selector.select();
            if (count > 0) 
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                // 遍历就续集
                while (iterator.hasNext()) 
                    SelectionKey next = iterator.next();
                    //处理键中的channel数据
                    handle(next);
                    iterator.remove();
                

            
        
    

    private void handle(SelectionKey selectionKey) throws IOException 
        DatagramChannel channel = (DatagramChannel) selectionKey.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //读取消息写到缓冲区
        channel.receive(buffer);
    

使用流程上述例子关键流程说明:

  1. 通过channel.register()注册管道。
  2. 通过selector.select()刷新已注册键的状态。
  3. selector.selectedKeys() 获取就续集并遍历。
  4. 处理键,即获取管道并读取消息。
  5. 从选择集中移除。

重点关注一下第 4、5步,如没有执行第4步(即读取管道中的消息),那么该管道目前还是处于读就续状态。当调用选择器select() 方法时会立马返回,造成死循环。如果执行了第四步但没有执行第5步,会导致其留存在选择集中,从而重复进行处理。

Selector中键集说明

在选择器中总共维护了三个键集,底层都是Set实现所以不会重复:

  • 全部键集(keys):所有向该选择器注册的键都放置于此(实现类SelectorImpl中为publicKeys)
  • 选择键集(selectedKeys):存放准备就续的键(实现类SelectorImpl中为publicSelectedKeys)
  • 取消键集(cancelledKeys) :存放已取消的键 (注意这个是在AbstractSelector中)

通过刷新或关闭选择器都会导致,键集发生变更。下图详细说明了键集更改过程

  • 调用select()会刷新键,将已就绪集添加至选择集中、清空取消键集并移除已取消键
  • 移除选择集,选择集不会被选择器,需自行调用Set.remove()进行移除
  • Cancel()或关闭选择器,关闭管道都会都会将键添加至取消集,但其不会被立马清除,只有下一次刷新时才会被 清空

自己实现一个心跳机制服务

public class EchoServerTest 
    @Test
    public void openServerTest() throws IOException 
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8888));
        serverSocketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        new Thread(()->
            try 
                startServer(selector);
             catch (IOException e) 
                e.printStackTrace();
            
        ,"selectorIoThread").start();
        while (true);
    

    private void startServer(Selector selector) throws IOException 
        while (true) 
            int selectCount = selector.select(1000);
            if (selectCount > 0) 
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext())
                    SelectionKey next = iterator.next();
                    if(next.isValid() && next.isAcceptable())
                        accepted(next);
                    else if(next.isValid() && next.isReadable())
                        new Thread(()->
                            try 
                                requested(next);
                             catch (IOException e) 
                                e.printStackTrace();
                            
                        ,"echoWorkedThread").start();
                    
                    iterator.remove();
                    try 
                        Thread.sleep(1);
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                
            
        
    

    private void requested(SelectionKey next) throws IOException 
        SocketChannel channel = (SocketChannel)next.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.read(buffer);
        buffer.put(String.valueOf(System.currentTimeMillis()).getBytes());
        buffer.flip();
        channel.write(buffer);

    

    private void accepted(SelectionKey next) throws IOException 
        ServerSocketChannel channel = (ServerSocketChannel)next.channel();
        SocketChannel accept = channel.accept();
        accept.configureBlocking(false);
        accept.register(next.selector(),SelectionKey.OP_READ);
    

这边小编稍微讲解一下,上面开始的都简单就是打开serverSocketChannel然后注册到selector里面,之后使用线程启动,然后刷新键,先是获取连接,连接完毕后修改成SocketChannel 的注册和写操作,之后就可以测试了。

小编这边让线程睡了1毫秒大家想一下为什么?

测试结果(这边小编用了一个tcp udp工具):

总结

今天小编详细介绍了Selector,包括他的工作模型,核心组件,分别介绍了selectableChannel,Selector,SelectionKey的功能作用,通过简单代码之后,小编又讲述了各个键集的变化过程,最后实现了一个心跳机制的服务。接下来nio还会实战一下就正式进入netty了。希望大家期待并继续关注。谢谢加油努力吧。

以上是关于Netty框架之NIO多路复用选择器的主要内容,如果未能解决你的问题,请参考以下文章

Netty框架之线程模型与基础用法

Netty框架之线程模型与基础用法

netty为什么非阻塞

从网络I/O模型到Netty,先深入了解下I/O多路复用

Netty——NIO(Selector)

Netty——NIO(Selector)