Java NIO学习笔记 使用Selector客户端与服务器的通信

Posted 不能说的秘密go

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java NIO学习笔记 使用Selector客户端与服务器的通信相关的知识,希望对你有一定的参考价值。

使用NIO的一个最大优势就是客户端于服务器自己的不再是阻塞式的,也就意味着服务器无需通过为每个客户端的链接而开启一个线程。而是通过一个叫Selector的轮循器来不断的检测那个Channel有消息处理。
简单来讲,Selector会不断地轮询注册在其上的Channel,如果某个Channel上面有新的TCP连接接入、读和写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的Set集合,进行后续的I/O操作。
由于select操作只管对selectedKeys的集合进行添加而不负责移除,所以当某个消息被处理后我们需要从该集合里去掉。

一个多路复用器Selector可以同时轮询多个Channel,由于JDK使用了epoll()代替传统的select实现,所以它并没有最大连接句柄1024/2048的限制。这也就意味着只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端,这确实是个非常巨大的进步。

下面,我们通过NIO编程的序列图和源码分析来熟悉相关的概念,以便巩固我们前面所学的NIO基础知识。

下面,我们对NIO服务端的主要创建过程进行讲解和说明,作为NIO的基础入门,我们将忽略掉一些在生产环境中部署所需要的一些特性和功能(比如TCP半包等问题)。

步骤一:打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父管道,代码示例如下。

ServerSocketChannel server = ServerSocketChannel.open();

步骤二:绑定监听端口,设置连接为非阻塞模式,示例代码如下。

server.socket().bind(new InetSocketAddress(7777),1024);
        // 设置为非阻塞模式, 这个非常重要
        server.configureBlocking(false);

步骤三:创建Reactor线程,创建多路复用器并启动线程,代码如下。

        Selector selector = Selector.open();
        new Thread(new ReactorTask()).start();

步骤四:将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,监听ACCEPT事件,代码如下。

server.register(selector, SelectionKey.OP_ACCEPT);

步骤五:多路复用器在线程run方法的无限循环体内轮询准备就绪的Key,代码如下。

while(true)
            selector.select(1000);
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> it = keys.iterator();
            SelectionKey key = null;
            while (it.hasNext()) 
                key = (SelectKey)it.next();
               //处理io
            

步骤六:多路复用器监听到有新的客户端接入,处理新的接入请求,完成TCP三次握手,建立物理链路,代码示例如下。

// 得到与客户端的套接字通道
SocketChannel channel = ssc.accept();

步骤七:设置客户端链路为非阻塞模式,示例代码如下。

channel.configureBlocking(false);

步骤八:将新接入的客户端连接注册到Reactor线程的多路复用器上,监听读操作,用来读取客户端发送的网络消息,代码如下。

 channel.register(selector, SelectionKey.OP_READ);

步骤九:异步读取客户端请求消息到缓冲区,示例代码如下。

int readBytes = channel.read(byteBuffer);

步骤十:对ByteBuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排,示例代码如下。

Object message = null;
while (buffer.hasRemain()) 
    buffer.mark();
    message = decode(buffer);
    if(message == null)
       buffer.reset();
       break;
    

if(!buffer.hasRemain())
  buffer.clear();
else 
  buffer.compact();


//业务线程处理message

步骤十一:将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端,示例代码如下。

 socketChannel.write(byteBuffer);

注意:如果发送区TCP缓冲区满,会导致写半包,此时,需要注册监听写操作位,循环写,直到整包消息写入TCP缓冲区,此处不赘述,后续会详细分析Netty的处理策略。

public class Nioservice 
    public  void init() throws IOException 
        Charset charset = Charset.forName("UTF-8");
        // 创建一个选择器,可用close()关闭,isOpen()表示是否处于打开状态,他不隶属于当前线程
        Selector selector = Selector.open();
        // 创建ServerSocketChannel,并把它绑定到指定端口上
        ServerSocketChannel server = ServerSocketChannel.open();
        server.socket().bind(new InetSocketAddress(7777),1024);
        // 设置为非阻塞模式, 这个非常重要
        server.configureBlocking(false);
        // 在选择器里面注册关注这个服务器套接字通道的accept事件
        // ServerSocketChannel只有OP_ACCEPT可用,OP_CONNECT,OP_READ,OP_WRITE用于SocketChannel
        server.register(selector, SelectionKey.OP_ACCEPT);


        while (true) 
        //休眠时间为1s,无论是否有读写等事件发生,selector每隔1s都被唤醒一次
            selector.select(1000);
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> it = keys.iterator();
            SelectionKey key = null;
            while (it.hasNext()) 
                //如果key对应的Channel包含客户端的链接请求
                // OP_ACCEPT 这个只有ServerSocketChannel才有可能触发
                key=it.next();
                // 由于select操作只管对selectedKeys进行添加,所以key处理后我们需要从里面把key去掉
                it.remove();
                if (key.isAcceptable()) 
                    ServerSocketChannel ssc  = (ServerSocketChannel) key.channel();
                    // 得到与客户端的套接字通道
                    //ServerSocketChannel的accept接收客户端的连接请求并创建SocketChannel实例,完成上述操作后,相当于完成了TCP的三次握手,TCP物理链路正式建立。
                    //我们需要将新创建的SocketChannel设置为异步非阻塞,同时也可以对其TCP参数进行设置,例如TCP接收和发送缓冲区的大小等。此处省掉
                    SocketChannel channel = ssc.accept();
                    channel.configureBlocking(false);
                    channel.register(selector, SelectionKey.OP_READ);
                    //将key对应Channel设置为准备接受其他请求
                    key.interestOps(SelectionKey.OP_ACCEPT);
                
                if (key.isReadable()) 
                    SocketChannel channel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    String content = "";
                    try 
                        int readBytes = channel.read(byteBuffer);
                        if (readBytes > 0) 
                            byteBuffer.flip(); //为write()准备
                            byte[] bytes = new byte[byteBuffer.remaining()];
                            byteBuffer.get(bytes);
                            content+=new String(bytes);
                            System.out.println(content);
                            //回应客户端
                            doWrite(channel);
                        
                        // 写完就把状态关注去掉,否则会一直触发写事件(改变自身关注事件)
                        key.interestOps(SelectionKey.OP_READ);
                     catch (IOException i) 
                        //如果捕获到该SelectionKey对应的Channel时出现了异常,即表明该Channel对于的Client出现了问题
                        //所以从Selector中取消该SelectionKey的注册
                        key.cancel();
                        if (key.channel() != null) 
                            key.channel().close();
                        
                    
                
            
        
    
    private  void doWrite(SocketChannel sc) throws IOException
        byte[] req ="服务器已接受".getBytes();
        ByteBuffer byteBuffer = ByteBuffer.allocate(req.length);
        byteBuffer.put(req);
        byteBuffer.flip();
        sc.write(byteBuffer);
        if(!byteBuffer.hasRemaining())
            System.out.println("Send 2 Service successed");
        
    

现在我们来看编写客户端的流程:

步骤一:打开SocketChannel,绑定客户端本地地址(可选,默认系统会随机分配一个可用的本地地址),示例代码如下。

   SocketChannel channel = SocketChannel.open();

步骤二:设置SocketChannel为非阻塞模式,同时设置客户端连接的TCP参数,示例代码如下。

channel.configureBlocking(false);

步骤三:异步连接服务端,示例代码如下。

channel.connect(new InetSocketAddress("xxx.xxx.xxx.xxx",7777))

步骤四:判断是否连接成功,如果连接成功,则直接注册读状态位到多路复用器中,如果当前没有连接成功(异步连接,返回false,说明客户端已经发送sync包,服务端没有返回ack包,物理链路还没有建立),示例代码如下。

       if(channel.connect(new InetSocketAddress("127.0.0.1",7777)))
            channel.register(selector, SelectionKey.OP_READ);
           //发送消息
          doWrite(channel, "66666666");
        else 
            channel.register(selector, SelectionKey.OP_CONNECT);
        

步骤五:向Reactor线程的多路复用器注册OP_CONNECT状态位,监听服务端的TCP ACK应答,示例代码如下。

channel.register(selector, SelectionKey.OP_CONNECT);

步骤六:创建Reactor线程,创建多路复用器并启动线程,代码如下。

        selector = Selector.open();
        new Thread(new ReactorTask()).start();

步骤七:多路复用器在线程run方法的无限循环体内轮询准备就绪的Key,代码如下。

while (!stop)
            selector.select(1000);
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> it = keys.iterator();
            SelectionKey key = null;
            while (it.hasNext())
            

步骤八:接收connect事件进行处理并判断是否链接成功,示例代码如下。

if (key.isConnectable())
  if (channel.finishConnect()) 
  

步骤九:注册读事件到多路复用器,示例代码如下。

 channel.register(selector, SelectionKey.OP_READ);

步骤十:异步读客户端请求消息到缓冲区,示例代码如下。

int readBytes = channel.read(byteBuffer);

步骤十一:对ByteBuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排,示例代码如下。

Object message = null;
while (buffer.hasRemain()) 
    buffer.mark();
    message = decode(buffer);
    if(message == null)
       buffer.reset();
       break;
    

if(!buffer.hasRemain())
  buffer.clear();
else 
  buffer.compact();


//业务线程处理message

步骤十二:将发生对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端,示例代码如下。

 socketChannel.write(byteBuffer);
public class NioClient 
    // 创建一个套接字通道,注意这里必须使用无参形式
    private Selector selector = null;
    static  Charset charset = Charset.forName("UTF-8");
    private volatile boolean stop = false;
    public  ArrayBlockingQueue<String> arrayQueue = new ArrayBlockingQueue<String>(8);
    public  void  init() throws IOException
        selector = Selector.open();
        SocketChannel channel = SocketChannel.open();
        // 设置为非阻塞模式,这个方法必须在实际连接之前调用(所以open的时候不能提供服务器地址,否则会自动连接)
        channel.configureBlocking(false);
        if(channel.connect(new InetSocketAddress("127.0.0.1",7777)))
            channel.register(selector, SelectionKey.OP_READ);
           //发送消息
          doWrite(channel, "66666666");
        else 
            channel.register(selector, SelectionKey.OP_CONNECT);
        


        //启动一个接受服务器反馈的线程
      //  new Thread(new ReceiverInfo()).start();

        while (!stop)
            selector.select(1000);
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> it = keys.iterator();
            SelectionKey key = null;
            while (it.hasNext())
                key = it.next();
                it.remove();
                SocketChannel sc = (SocketChannel) key.channel();
                // OP_CONNECT 两种情况,链接成功或失败这个方法都会返回true
                if (key.isConnectable())
                    // 由于非阻塞模式,connect只管发起连接请求,finishConnect()方法会阻塞到链接结束并返回是否成功
                    // 另外还有一个isConnectionPending()返回的是是否处于正在连接状态(还在三次握手中)
                    if (channel.finishConnect()) 
                       /* System.out.println("准备发送数据");
                        // 链接成功了可以做一些自己的处理
                        channel.write(charset.encode("I am Coming"));
                        // 处理完后必须吧OP_CONNECT关注去掉,改为关注OP_READ
                        key.interestOps(SelectionKey.OP_READ);*/
                          sc.register(selector,SelectionKey.OP_READ);
                    //    new Thread(new DoWrite(channel)).start();
                      doWrite(channel, "66666666");
                    else 
                        //链接失败,进程推出或直接抛出IOException
                        System.exit(1);
                    
                 if(key.isReadable())
                //读取服务端的响应
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                     int readBytes = sc.read(buffer);
                    String content = "";
                    if (readBytes>0)
                        buffer.flip();
                        byte[] bytes = new byte[buffer.remaining()];
                        buffer.get(bytes);
                        content+=new String(bytes);
                        stop=true;
                    else if(readBytes<0) 
                        //对端链路关闭
                        key.channel();
                        sc.close();
                    
                    System.out.println(content);
                    key.interestOps(SelectionKey.OP_READ);
                
            
        
    
private  void doWrite(SocketChannel sc,String data) throws IOException
        byte[] req =data.getBytes();
        ByteBuffer byteBuffer = ByteBuffer.allocate(req.length);
        byteBuffer.put(req);
        byteBuffer.flip();
        sc.write(byteBuffer);
        if(!byteBuffer.hasRemaining())
            System.out.println("Send 2 client successed");
        
    

启动客户端类的init()方法之后就会向服务器发送一个字符串,服务器接受到之后会向客户端回应一个。运行如上代码,结果正确。

直接在使用阻塞式IO的时候,在客户端与服务器之间传输时使用了一个经典范式,客户端使用维护一个队列来发送数据给服务器.

代码示例如下:

private ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(8);
 public void connect() throws IOException 
        // 三次握手
        if (socket == null || socket.isClosed()) 
            socket = new Socket(InfoUtils.SOCKET_IP, InfoUtils.SOCKET_PORT);
        
        //发送消息
       new Thread(new SendMessage()).start();


 public class SendMessage implements Runnable
        @Override
        public void run() 
            try 
                OutputStream os = socket.getOutputStream();
                while (true)
                    String content = queue.take();
                    os.write(content.getBytes());
                
             catch (Exception e) 
                e.printStackTrace();
            
        
    

通过源码对比分析,我们发现NIO编程难度确实比同步阻塞BIO大很多,我们的NIO例程并没有考虑“半包读”和“半包写”,如果加上这些,代码将会更加复杂。NIO代码既然这么复杂,为什么它的应用却越来越广泛呢,使用NIO编程的优点总结如下。

(1)客户端发起的连接操作是异步的,可以通过在多路复用器注册OP_CONNECT等待后续结果,不需要像之前的客户端那样被同步阻塞。

(2)SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样I/O通信线程就可以处理其他的链路,不需要同步等待这个链路可用。

3)线程模型的优化:由于JDK的Selector在Linux等主流操作系统上通过epoll实现,它没有连接句柄数的限制(只受限于操作系统的最大句柄数或者对单个进程的句柄限制),这意味着一个Selector线程可以同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降,因此,它非常适合做高性能、高负载的网络服务器。

JDK1.7升级了NIO类库,升级后的NIO类库被称为NIO2.0,引人注目的是,Java正式提供了异步文件I/O操作,同时提供了与UNIX网络编程事件驱动I/O对应的AIO,后续我们学习下如何利用NIO2.0编写AIO程序,还是以客户端服务器通信为例进行讲解。

以上是关于Java NIO学习笔记 使用Selector客户端与服务器的通信的主要内容,如果未能解决你的问题,请参考以下文章

Java NIO 之 Selector 练习

NIO学习

java NIO --selector

Java NIO 学习--Selector

Java IO学习笔记六:NIO到多路复用

读书笔记-NIO的工作方式