Java NIO 之 Selector 练习

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java NIO 之 Selector 练习相关的知识,希望对你有一定的参考价值。

    目的:本编文章主要想分享一下NIO方面的知识,由于最近几天工作不忙,趁机学习了下Java NIO Selector的相关知识;主要是实践操作的;具体的理论知识,可以参考网上的文章。


  • 测试用例主要有三种方式

    其实,是服务器端的逻辑不变,客户端有三种方式而已。

    服务器端:2个selector + channel, 客户端:一个channel

    服务器端:2个selector + channel, 客户端:多个channel(多线程方式)

    服务器端:2个selector + channel, 客户端:1个selector + channel


服务端,如果想要一个selector+channel的话,直接在initAndRegister()方法中,注释掉相关代码即可了,当然,客户端也要修改端口部分


  • 服务端代码:

package xingej.selector.test002;

//基本思路逻辑:
//------------------------------------------------------------------------------
//1、创建一个通道选择器Selector
//2、创建服务器端的ServerSocketChannel通道
//      设置ServerSocketChannel属性,
//      端口号的绑定
// 3、将通道选择器 与  ServerSocketChannel通道进行绑定,并向通道选择器注册感兴趣的事件
//------------------------------------------------------------------------------
// 4、通道选择器开始工作监听管道事件,调用select()方法,死循环的方式调用
//      如果用户感兴趣的事件发生,就去处理
//      否则,就阻塞在这里

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

public class NioselectorServer {
    //这里声明了两个缓存区,发送和接收缓冲区
    //其实,一个就可以了
    private static ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
    private static ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
    private Selector selector;

    public void initAndRegister() throws Exception {
        //监听两个服务,因此需要两个端口的
        int listenPortA = 8081;
        int listenPortB = 8082;

        //创建第一个ServerSocketChannel对象实例
        ServerSocketChannel serverSocketChannelA = builderServerSocketChannel(listenPortA);
        //创建第二个ServerSocketChannel对象实例
        ServerSocketChannel serverSocketChannelB = builderServerSocketChannel(listenPortB);

        //创建通道选择器Selector
        selector = Selector.open();

        //将serverSocketChannelA 通道注册到通道选择器Selector里
        register(selector, serverSocketChannelA);
        //将serverSocketChannelB 通道注册到通道选择器Selector里
        register(selector, serverSocketChannelB);
    }

    //开始业务监听了
    public void listen() throws Exception {

        System.out.println("-----服务器-------开始接收请求-------OK--------");

        while (true) {
            int readyChannelNum = selector.select();
            if (0 == readyChannelNum) {
                continue;
            }
            //从选择器中的selectedKeys,可以获取此时已经准备好的管道事件
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                //从迭代器移除刚选好的键
                iterator.remove();
                dealSelectionKey(selector, selectionKey);
            }

            Thread.sleep(2000);

        }
    }

    //处理具体事件
    private void dealSelectionKey(Selector selector, SelectionKey selectionKey) throws Exception {
        if (selectionKey.isAcceptable()) {

            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
            SocketChannel clientSocketChannel = serverSocketChannel.accept();
            clientSocketChannel.configureBlocking(false);
            clientSocketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        } else //读取客户端的内容
            if (selectionKey.isReadable()) {

                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                receiveBuffer.clear();
                StringBuilder msg = new StringBuilder();
                //将客户端发送过来的数据,从管道中读取到或者说写到 接收缓存里
                while (socketChannel.read(receiveBuffer) > 0) {
                    receiveBuffer.flip();
                    msg.append(new String(receiveBuffer.array()));
                    receiveBuffer.clear();//清楚数据,下次可以重新写入
                }
                socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                //打印输出从客户端读取到的信息
                System.out.println("------>:\t" + msg.toString());

//                socketChannel.close();
            } else
                //向客户端 发送数据
                if (selectionKey.isWritable()) {
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    sendBuffer.flip();
                    socketChannel.write(sendBuffer);
                    selectionKey.interestOps(SelectionKey.OP_READ);
                }
    }

    //将ServerSocketChannel 向 Selector进行注册,也就是将两者绑定在一起,
    private void register(Selector selector, ServerSocketChannel serverSocketChannel) throws Exception {
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    //创建ServerSocketChannel对象,并进行属性设置
    private ServerSocketChannel builderServerSocketChannel(int port) throws Exception {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //设置属性,如非阻塞模式
        serverSocketChannel.configureBlocking(false);
        //绑定端口号
        serverSocketChannel.bind(new InetSocketAddress(port));
        return serverSocketChannel;
    }

    public static void main(String[] args) throws Exception {
        NIOSelectorServer nioSelectorServer = new NIOSelectorServer();
        //初始化 并 注册
        nioSelectorServer.initAndRegister();
        //开始监听
        nioSelectorServer.listen();
    }
}



  • 客户端请求方式一:

    模型如下:

    技术分享

代码如下:

    

package xingej.selector.test002;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class NIOClient {
    public static void main(String[] args) throws Exception {
        SocketChannel clientChannel = SocketChannel.open();
        clientChannel.connect(new InetSocketAddress("localhost", 8081));
        clientChannel.configureBlocking(false);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.put(new String ("hello, server! ").getBytes());
        buffer.flip();
        clientChannel.write(buffer);
        clientChannel.close();
    }
}
  • 客户端请求方式二:

   模型如下:

   技术分享

  代码如下:

      

package xingej.selector.test002;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Random;

public class NIOClient2 {
    public static void main(String[] args) throws Exception {

        String msg = "hello, NIO Server, I‘m ";

        int[] ports = {8081, 8082};
        for (int i = 0; i < 10; i++) {
            int index = i % 2;
            int port = ports[index];
            new Thread(new SocketChannelThread(msg + i +" client", port)).start();
        }
    }
}

class SocketChannelThread implements Runnable {
    //向服务器发送的消息体
    private String msg;
    private int port;

    private SocketChannel clientChannel;

    public SocketChannelThread(String msg, int port) {
        this.msg = msg;
        this.port = port;
    }

    @Override
    public void run() {
        try {
            //创建一个SocketChannel对象实例
            clientChannel = SocketChannel.open();
            //链接服务器
            clientChannel.connect(new InetSocketAddress("localhost", port));
            //设置通道未非阻塞模式
            clientChannel.configureBlocking(false);
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int sendNum = new Random().nextInt(5) + 1;
            for(int i = 0; i < sendNum; i++) {
                buffer.put(new String(msg).getBytes());
                buffer.flip();
                //将缓冲区的内容发送到通道里
                clientChannel.write(buffer);
                //清理缓存区,下次重新写入
                buffer.clear();
                //每次发送完成后,休息几秒中,就是为了测试
                Thread.sleep(sendNum * 1000);
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try{
                //如果此通过处于开通状态的话,就关闭此通道
               if (clientChannel.isOpen()) {
                   System.out.println("-----关闭通道了------");
                   clientChannel.close();
               }
            }catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}


  • 客户端请求方式三:

   模型如下:

技术分享

代码如下:

package xingej.selector.test002;
//创建SocketChannel
//      链接服务器
//向服务器发送消息

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

//
public class NIOSelectorClient {
    private static Selector selector;
    private static boolean flag = false;
    private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);

    public void initAndRegister() throws Exception{
        selector = Selector.open();
        createAndRegister(5);
    }

    private void createAndRegister(int socketChannelNum) throws Exception{
        ExecutorService socketThreadPool = Executors.newFixedThreadPool(5);
        CountDownLatch _latchs = new CountDownLatch(socketChannelNum);
        Integer[] ports = {8081, 8082};

        for(int i = 0; i < socketChannelNum; i++) {
            int port = ports[i % 2];
           socketThreadPool.submit(new SocketChannelThread(port, _latchs));
        }
        _latchs.await();
        socketThreadPool.shutdown();
        flag = true;

    }

    class SocketChannelThread implements Runnable{
        private CountDownLatch _latch;
        private int port;
        private SocketChannel socketChannel;

        public SocketChannelThread(int port, CountDownLatch _latch) {
            this.port = port;
            this._latch = _latch;
        }
        @Override
        public void run() {
            try {
                socketChannel= SocketChannel.open();
                socketChannel.configureBlocking(false);
                //1到10秒钟,随机休息
                //这里,添加时间的目的,是想模拟一下,不想同一时间,向服务器发起请求
                int time = (new Random().nextInt(10) + 1) * 1000;
                System.out.println("----此通道----休息的时间是------:\t" + time / 1000 + " 秒");
                Thread.sleep(time);
                System.out.println("--------2-------port:\t" + port);
                socketChannel.connect(new InetSocketAddress("localhost", port));
                System.out.println("--------3-------");
                socketChannel.register(selector, SelectionKey.OP_CONNECT);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //计数器,减一
                _latch.countDown();
            }
        }
    }

    public void listen() throws Exception{

        while (true) {
            System.out.println("-----客户端----准备好了----:\t");
            int readyChannelNum = selector.select();

            System.out.println("-----客户端----准备好的管道数量是-----:\t" + readyChannelNum);
            if (0 == readyChannelNum) {
                continue;
            }

            Set<SelectionKey> selectionKeys = selector.selectedKeys();

            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
               //下面的方法,就可以将selectionKey 键移除
                iterator.remove();
                if (selectionKey.isConnectable()) {
                    if (socketChannel.isConnectionPending()) {
                        socketChannel.finishConnect();
                        System.out.println("----客户端----链接完毕了-----");
                    }
                    socketChannel.register(selector, SelectionKey.OP_WRITE);
                }else if (selectionKey.isWritable()) {
                    sendBuffer.clear();
                    sendBuffer.put("hello, server, I‘m client! Are you OK!!!".getBytes());
                    //flip()必须有的
                    sendBuffer.flip();
                    socketChannel.write(sendBuffer);
                    System.out.println("----客户端---向服务器---发送消息-----完毕----OK-----");
                    //这里注册的事件是write,
                    //效果就是,客户端不断的发送消息
                    //当然,也可以修改成其他事件,如SelectionKey.OP_READ
                    selectionKey.interestOps(SelectionKey.OP_WRITE);
                }

            }
            //每隔1秒中,就向服务器发送信息
            Thread.sleep(1000);
        }

    }

    public static void main(String[] args) throws Exception{
        NIOSelectorClient nioSelectorClient = new NIOSelectorClient();
        nioSelectorClient.initAndRegister();

        //死循环的方式,来监听标志位,
        //一旦标志位发生改变,就开始监听
        while (true) {
            if (flag) {
                nioSelectorClient.listen();
                break;
            }
        }
    }
}


  • 总结:

  •     1、在调用Selector.select()方法之前,最好将要使用的一个SocketChannel或者多个SocketChannel 完成注册功能;也就是说,所有SocketChannel完成注册事件后,才能调用select方法;

       不然,很容易出现死锁现象。

       如下图所示:

     技术分享

      解决措施方式一: 客户端请求方式三,刚开始并没有添加

CountDownLatch 计数器

,针对死锁才添加的。

主线程再调用监听方法时,最好使用观察者模式,目前这里使用了死循环的方式监听,感觉不太好。

  •    2、SocketChannel 通道属于长链接方式,客户端不再发送消息时,通道依旧存在,因此,可以调用Channel.close方法进行关闭


  • 学习方式的建议

    如果想更加深入的了解NIO,Selector的话,最好还是不断的进行测试,

    如在客户端添加Channel.close(),修改感兴趣的事件,等等

    去观察客户端,服务器端的现象,

    去总结,去研究源码,

    研究源码的目的,不光光是搞清楚背后的原理,

    还希望能够学到背后优秀的设计模式,设计思路,使用场景等等,

    扩展眼界


代码已分享到git上

https://github.com/xej520/xingej-nio






本文出自 “XEJ分布式工作室” 博客,请务必保留此出处http://xingej.blog.51cto.com/7912529/1969782

以上是关于Java NIO 之 Selector 练习的主要内容,如果未能解决你的问题,请参考以下文章

Day388.Selector&Pipe&fileLock文件锁&Path&Files&AsynchronousFileChannel异步通道 -NIO(代码片

JAVA NIO 之 Selector 组件

Java NIO之Selector(选择器)

JAVA-5NIO之Selector

java nio之selector

Java NIO之Selector(选择器)