Java IO学习笔记六:NIO到多路复用

Posted Practitioner

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java IO学习笔记六:NIO到多路复用相关的知识,希望对你有一定的参考价值。

作者:Grey

原文地址:Java IO学习笔记六:NIO到多路复用

虽然NIO性能上比BIO要好,参考:Java IO学习笔记五:BIO到NIO

但是NIO也有问题,NIO服务端的示例代码中往往会包括如下代码:即:遍历所有的SocketChannel,获取能读写数据的客户端,当客户端数量非常多的时候,服务端要轮询所有连接的客户端拿数据(recv调用),很多调用是无意义的,这样会导致频繁的用户态切换成内核态,导致性能变差。

....
//遍历已经链接进来的客户端能不能读写数据
            for (SocketChannel c : clients) {  
                int num = c.read(buffer); 
                if (num > 0) {
                    buffer.flip();
                    byte[] aaa = new byte[buffer.limit()];
                    buffer.get(aaa);

                    String b = new String(aaa);
                    System.out.println(c.socket().getPort() + " : " + b);
                    buffer.clear();
                }
            }
...

多路复用技术可以解决NIO的这个问题,多个IO通过一个系统调用获得其中的IO状态,然后由程序对有状态的IO进行读写操作。在Linux系统中,多路复用的实现有:

  • 基于POSIX标准的SELECT
  • POLL (select只支持最大fd < 1024,如果单个进程的文件句柄数超过1024,select就不能用了。poll在接口上无限制)
  • EPOLL

其中SELECT和POLL类似,但是有一些区别,参考select和poll的区别

无论NIO,SELECT还是POLL,都是要遍历所有IO,询问状态,只不过遍历这件事到底是内核来做还是应用程序来做而已。

而epoll,可以看成是SELECT和POLL的增强,在调用select/poll时候,都需要把fd集合从用户态拷贝到内核态,但是epoll调用epoll_ctl时拷贝进内核并保存,之后每次epoll_wait不做拷贝,而且epoll采用的是事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪fd放到rdllist里面。时间复杂度O(1)。

更多内容可以参考:

Java的Selector封装了底层epoll和poll的API,可以通过指定如下参数来调用执行的内核调用, 在Linux平台,如果指定

-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.PollSelectorProvider

则底层调用poll,

指定为:

-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider

或者不指定,则底层调用epoll

源码参考:jdk8u-jdk

image

接下来,我们使用一套服务端代码,在Linux服务器上运行,分别指定底层用epoll和poll,并用strace来追踪其内核调用。

准备服务端代码:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

public class SocketMultiplexingV1 {

    private Selector selector = null;
    int port = 9090;

    public void initServer() {
        try {
            ServerSocketChannel server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(port));
            selector = Selector.open();
            server.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void start() {
        initServer();
        System.out.println("服务器启动了。。。。。");
        try {
            while (true) {
                Set<SelectionKey> keys = selector.keys();
                System.out.println(keys.size() + "   size");
                while (selector.select() > 0) {
                    //返回的有状态的fd集合
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iter = selectionKeys.iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        if (key.isAcceptable()) {
                            acceptHandler(key);
                        } else if (key.isReadable()) {
                            readHandler(key);
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void acceptHandler(SelectionKey key) {
        try {
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            SocketChannel client = ssc.accept();
            client.configureBlocking(false);
            ByteBuffer buffer = ByteBuffer.allocate(8192);
            client.register(selector, SelectionKey.OP_READ, buffer);
            System.out.println("-------------------------------------------");
            System.out.println("新客户端:" + client.getRemoteAddress());
            System.out.println("-------------------------------------------");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void readHandler(SelectionKey key) {
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.clear();
        int read;
        try {
            while (true) {
                read = client.read(buffer);
                if (read > 0) {
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        client.write(buffer);
                    }
                    buffer.clear();
                } else if (read == 0) {
                    break;
                } else {
                    client.close();
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();

        }
    }

    public static void main(String[] args) {
        SocketMultiplexingV1 service = new SocketMultiplexingV1();
        service.start();
    }
}

和服务端代码在同一目录下准备一个脚本 SocketMultiplexingV1.sh

rm -rf ${1}*
/usr/local/jdk/bin/javac SocketMultiplexingV1.java
strace -ff -o $1 /usr/local/jdk/bin/java -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.${1}SelectorProvider SocketMultiplexingV1

执行:

./SocketMultiplexingV1 Poll

底层调用Poll

重新打开一个控制台,通过nc工具连接这个服务端

nc localhost 9090

服务端可以正常接收到连接

[root@io io]# ./SocketMultiplexingV1.sh Poll
服务器启动了。。。。。
1   size
-------------------------------------------
新客户端:/0:0:0:0:0:0:0:1:39724
-------------------------------------------

暂时先不要发送数据,此时,查看服务端的进程:

[root@io io]# jps
1712 Jps
1659 SocketMultiplexingV1

查看服务端目前关联的文件描述符

[root@io io]# lsof -p 1659
...
java    1659 root    4u  IPv6  25831       0t0       TCP *:websm (LISTEN)
...
java    1659 root    7u  IPv6  22508       0t0       TCP localhost:websm->localhost:39724 (ESTABLISHED)

其中4u为服务端监听的Socket文件描述符,7u为新连接进来的客户端Socket文件描述符。

通过nc客户端给服务端发送一些数据,客户端也可以正常收到服务端返回的数据

[root@io io]# nc localhost 9090
sdfasdfasd
sdfasdfasd

接下来停掉服务端和客户端, 查看追踪日志

[root@io io]# ll
total 2444
-rwxr-xr-x. 1 root root     106 Jun 10 19:25 mysh.sh
-rw-r--r--. 1 root root    1714 Jun 12 16:35 OSFileIO.java
-rw-r--r--. 1 root root    9572 Jun 17 19:58 Poll.1659
-rw-r--r--. 1 root root  215792 Jun 17 19:58 Poll.1660
-rw-r--r--. 1 root root    1076 Jun 17 19:58 Poll.1661
-rw-r--r--. 1 root root     983 Jun 17 19:58 Poll.1662
-rw-r--r--. 1 root root     850 Jun 17 19:58 Poll.1663
-rw-r--r--. 1 root root     940 Jun 17 19:58 Poll.1664
-rw-r--r--. 1 root root     948 Jun 17 19:58 Poll.1665
-rw-r--r--. 1 root root     885 Jun 17 19:58 Poll.1666
-rw-r--r--. 1 root root     948 Jun 17 19:58 Poll.1667
-rw-r--r--. 1 root root    1080 Jun 17 19:58 Poll.1668
-rw-r--r--. 1 root root  124751 Jun 17 19:58 Poll.1669
-rw-r--r--. 1 root root    1245 Jun 17 19:58 Poll.1670
-rw-r--r--. 1 root root    1210 Jun 17 19:58 Poll.1671
-rw-r--r--. 1 root root    2416 Jun 17 19:58 Poll.1672
-rw-r--r--. 1 root root   27498 Jun 17 19:58 Poll.1673
-rw-r--r--. 1 root root   27326 Jun 17 19:58 Poll.1674
-rw-r--r--. 1 root root   27602 Jun 17 19:58 Poll.1675
-rw-r--r--. 1 root root   26866 Jun 17 19:58 Poll.1676
-rw-r--r--. 1 root root    1141 Jun 17 19:58 Poll.1677
-rw-r--r--. 1 root root 1953818 Jun 17 19:58 Poll.1678
-rw-r--r--. 1 root root    2204 Jun 17 19:58 Poll.1831
-rw-r--r--. 1 root root    3440 Jun 17 19:48 SocketMultiplexingV1.class
-rw-r--r--. 1 root root    3315 Jun 17 19:13 SocketMultiplexingV1.java
-rwxr-xr-x. 1 root root     199 Jun 17 19:19 SocketMultiplexingV1.sh

其中Poll.1678为主线程日志, 我们一一看下整个调用过程

...
2535 socket(AF_INET6, SOCK_STREAM, IPPROTO_IP) = 4
...
2793 bind(4, {sa_family=AF_INET6, sin6_port=htons(9090), inet_pton(AF_INE        T6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28)         = 0
2794 listen(4, 50)                           = 0
...

以上两个调用对应了代码中建立Socket并绑定9090端口进行监听这个逻辑。

...
2772 fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
...
2883 poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}], 2, -1) = 1 ([{f        d=4, revents=POLLIN}])

以上调用对应了:

server.configureBlocking(false);

调用的poll方法表示一个新的文件描述符4u有POLLIN(POLLIN:There is data to read)的事件

...
2893 accept(4, {sa_family=AF_INET6, sin6_port=htons(39724), inet_pton(AF_        INET6, "::1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0},         [28]) = 7

...

2935 poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}, {fd=7, events=PO        LLIN}], 3, -1) = 1 ([{fd=7, revents=POLLIN}])


这里说明接收了一个新的Socket连接,就是我们刚才用lsof看到的7u这个文件描述符。,调用了poll方法,说明一个新的文件描述符7u有POLLIN(POLLIN:There is data to read)的事件。

我们的代码中对于每次接收的客户端,也会把客户端设置为非阻塞,即:

client.configureBlocking(false);

对应的内核调用就是:

2926 fcntl(7, F_SETFL, O_RDWR|O_NONBLOCK)    = 0

以上就是poll调用对应内核函数的调用。

接下来切换成epoll模式,重新执行脚本

[root@io io]# ./SocketMultiplexingV1.sh EPoll
服务器启动了。。。。。
1   size

用nc连接服务端

nc localhost 9090

服务端响应正常

[root@io io]# ./SocketMultiplexingV1.sh EPoll
服务器启动了。。。。。
1   size
-------------------------------------------
新客户端:/0:0:0:0:0:0:0:1:39726
-------------------------------------------

通过nc发送一些数据

[root@io io]# nc localhost 9090
asdfasd
asdfasd

也可以正常接收

接下来停掉服务端和客户端,查看主线程调用情况

[root@io io]# ll -h EPoll.*
-rw-r--r--. 1 root root 9.4K Jun 17 20:33 EPoll.2067
-rw-r--r--. 1 root root 212K Jun 17 20:33 EPoll.2068
-rw-r--r--. 1 root root 1.1K Jun 17 20:33 EPoll.2069
-rw-r--r--. 1 root root  983 Jun 17 20:33 EPoll.2070
-rw-r--r--. 1 root root  850 Jun 17 20:33 EPoll.2071
-rw-r--r--. 1 root root  983 Jun 17 20:33 EPoll.2072
-rw-r--r--. 1 root root  948 Jun 17 20:33 EPoll.2073
-rw-r--r--. 1 root root  983 Jun 17 20:33 EPoll.2074
-rw-r--r--. 1 root root  850 Jun 17 20:33 EPoll.2075
-rw-r--r--. 1 root root 1.1K Jun 17 20:33 EPoll.2076
-rw-r--r--. 1 root root  31K Jun 17 20:33 EPoll.2077
-rw-r--r--. 1 root root 1.4K Jun 17 20:33 EPoll.2078
-rw-r--r--. 1 root root 1.3K Jun 17 20:33 EPoll.2079
-rw-r--r--. 1 root root 2.4K Jun 17 20:33 EPoll.2080
-rw-r--r--. 1 root root 9.0K Jun 17 20:33 EPoll.2081
-rw-r--r--. 1 root root 8.7K Jun 17 20:33 EPoll.2082
-rw-r--r--. 1 root root 8.6K Jun 17 20:33 EPoll.2083
-rw-r--r--. 1 root root 8.2K Jun 17 20:33 EPoll.2084
-rw-r--r--. 1 root root 1.2K Jun 17 20:33 EPoll.2085
-rw-r--r--. 1 root root 400K Jun 17 20:33 EPoll.2086
-rw-r--r--. 1 root root 2.2K Jun 17 20:33 EPoll.2109

vi EPoll.2068

其中新建Socket,Bind 9090端口,设置非阻塞和Poll都是相同的调用

...
  2539 socket(AF_INET6, SOCK_STREAM, IPPROTO_IP) = 4
....
 2776 fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
 ....
 2797 bind(4, {sa_family=AF_INET6, sin6_port=htons(9090), inet_pton(AF_INE        T6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28)         = 0
 2798 listen(4, 50)                           = 0
....

但是一旦有新的连接进来

···
2852 epoll_create(256)                       = 7

···

2862 epoll_ctl(7, EPOLL_CTL_ADD, 5, {EPOLLIN, {u32=5, u64=140462610448389        }}) = 0

···
2888 epoll_wait(7, [{EPOLLIN, {u32=4, u64=140462610448388}}], 4096, -1) =         1

···

epoll_create1: 创建一个epoll实例,文件描述符
epoll_ctl: 将监听的文件描述符添加到epoll实例中,实例代码为将标准输入文件描述符添加到epoll中
epoll_wait: 等待epoll事件从epoll实例中发生, 并返回事件以及对应文件描述符

调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个 红黑树 用于存储以后epoll_ctl传来的socket外,还会再建立一个list链表,用于存储准备就绪的事件.

当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。

源码:Github

参考资料:

深入理解 Epoll

Select、Poll、Epoll详解

select和poll的区别

以上是关于Java IO学习笔记六:NIO到多路复用的主要内容,如果未能解决你的问题,请参考以下文章

Java -- 每日一问:Java提供了哪些IO方式? NIO如何实现多路复用?

Java网络编程和NIO详解2:JAVA NIO一步步构建IO多路复用的请求模型

java nio的实现原理

详解 同步异步阻塞非阻塞 与 BIO NIO AIO区别多路复用

JAVA的 IO NIO AIO笔记

Java网络编程——NIO的阻塞IO模式非阻塞IO模式IO多路复用模式的使用