NIO实现TCP的非阻塞通信

Posted Iris_xixi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了NIO实现TCP的非阻塞通信相关的知识,希望对你有一定的参考价值。

这一次写NIO实现非阻塞通信时遇到了很多问题,我所理解的非阻塞是对于一个用户而言它的读写不会相互制约,而在此次编写过程中,发现其实非阻塞是相对于多个用户而言的。
看到网上一个对同步异步阻塞非阻塞的例子,感觉挺好的,就拷过来了:

老张爱喝茶,废话不说,煮开水。
出场人物:老张,水壶两把(普通水壶,简称水壶;会响的水壶,简称响水壶)。
1 老张把水壶放到火上,立等水开。(同步阻塞)
老张觉得自己有点傻
2 老张把水壶放到火上,去客厅看电视,时不时去厨房看看水开没有。(同步非阻塞)
老张还是觉得自己有点傻,于是变高端了,买了把会响笛的那种水壶。水开之后,能大声发出嘀~~~~的噪音。
3 老张把响水壶放到火上,立等水开。(异步阻塞)
老张觉得这样傻等意义不大
4 老张把响水壶放到火上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶。(异步非阻塞)
老张觉得自己聪明了。


所谓同步异步,只是对于水壶而言。
普通水壶,同步;响水壶,异步。
虽然都能干活,但响水壶可以在自己完工之后,提示老张水开了。这是普通水壶所不能及的。
同步只能让调用者去轮询自己(情况2中),造成老张效率的低下。

所谓阻塞非阻塞,仅仅对于老张而言。
立等的老张,阻塞;看电视的老张,非阻塞。
情况1和情况3中老张就是阻塞的,媳妇喊他都不知道。虽然3中响水壶是异步的,可对于立等的老张没有太大的意义。所以一般异步是配合非阻塞使用的,这样才能发挥异步的效用。

——来源网络,作者不明。 

作者:愚抄
链接:https://www.zhihu.com/question/19732473/answer/23434554
来源:知乎
著作权归作者所有,转载请联系作者获得授权。

使用的类:
1.ServerSocketChannel类:ServerSocket的代替类,支持非阻塞通信;
2.SocketChannel类:Socket的代替类,支持非阻塞通信;
3.Selector:为ServerSocketChannel监控接收连接就绪事件,为SocketChannel监控连接就绪,读就绪和写就绪事件。
4.SelectionKey:代表ServerSocketChannel和SocketChannel向Selector注册事件的句柄。

基本思路:
服务器端:
1.使用ServerSocketChannel类对地址端口号进行绑定,通过open()方法获取通道,并且设置为非阻塞;
2.通过Selector的open()方法,创建Selector对象;
3.进行注册连接就绪事件;
3.通过Selector的select()方法返回注册事件的数目,返回对象为SelectionKey,对SelectionKey进行遍历,当监听到连接事件的时候,就使用accept()连接并返回一个SocketChannel对象,对它注册读就绪事件;
4.收到消息之后进行反馈;
客户端:
1.通过IP和端口号进行连接;
2.通过Selector的open()方法创建该类对象;
3.进行注册读事件监听,读取服务器端的反馈;
4.输出消息;

代码:
服务器端:

package sency.lay.two;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

/**
 * 服务器端
 * 
 * @author sency
 * 
 */
public class NServer {
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;
    private int port = 8000;
    // 用于编码解码对象
    private Charset charse = Charset.forName("GBK");
    public static SelectionKey key = null;


    public NServer() throws IOException {
        // 创建Selector对象
        selector = Selector.open();
        // 打开一个服务器端通道
        serverSocketChannel = ServerSocketChannel.open();
        InetSocketAddress isa = new InetSocketAddress("127.0.0.1", port);
        serverSocketChannel.bind(isa);
        // 设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        // 为服务器通道注册连接就绪监听事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("=======Welcome!!!=======");
    }

    public static void main(String args[]) throws IOException {
        new NServer().service();
    }

    private void service() throws IOException {
        // TODO Auto-generated method stub
        // 通过select方法选择一组键,其相应的通道已为 I/O操作准备就绪
        while (selector.select() > 0) {
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> it = keys.iterator();
            while (it.hasNext()) {

                key = it.next();
                // 删除正在处理的key
                it.remove();
                if (key.isAcceptable()) {
                    SocketChannel sc = serverSocketChannel.accept();
                    if (sc.isConnected()) {
                        System.out.println("连接到:"
                                + sc.socket().getInetAddress() + "--"
                                + sc.socket().getPort());
                    }
                    // 设置为非阻塞模式
                    sc.configureBlocking(false);
                    // 为返回的SocketChannel对象注册读就绪和写就绪监听
                    sc.register(selector, SelectionKey.OP_READ);
                }

                if (key.isReadable()) {
                    try {
                        readMsg(key);
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }

                }

            }
        }
    }

    private void sendEcho(SelectionKey key,String content) throws IOException {
        if(content.length()>0){
            SocketChannel sc = (SocketChannel) key.channel();
            String echo = "#ServerEcho:"+content+"收到!";
            sc.write(charse.encode(echo));
            System.out.println(echo);
        }
    }

    private void readMsg(SelectionKey key) throws IOException {
        // TODO Auto-generated method stub
        // 获取该key所对应的SocketChannel
        SocketChannel socketChannel = (SocketChannel) key.channel();
        // 创建一个ByteBuffer用于存放读取的数据
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        String content = "";
        try {
            while (socketChannel.read(buffer) > 0) {
                // 将极限设置为当前位置,再将当前位置设置为0
                buffer.flip();
                // 对读取取内容进行解码
                content += charse.decode(buffer);
            }
            System.out.println("#Client:" + content);
            // 将key对应的Channel设置为准备下一次读取
            key.interestOps(SelectionKey.OP_READ);
            buffer.clear();
        } catch (IOException e) {
            key.cancel();
            if (key.channel() != null) {
                key.channel().close();
            }
        }
        sendEcho(key,content);
    }
}

客户端:

package sency.lay.two;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

/**
 * 客户端
 * 
 * @author sency
 * 
 */
public class NClient {
    private SocketChannel socketChannel;
    private Selector selector;
    private int port = 8000;
    private Charset charse = Charset.forName("GBK");
    public SelectionKey key = null;

    public NClient() throws IOException {
        selector = Selector.open();
        // 创建SocketChannel
        InetSocketAddress isa = new InetSocketAddress("127.0.0.1", port);
        socketChannel = SocketChannel.open();
        socketChannel.connect(isa);
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

    public static void main(String args[]) throws IOException {
        System.out.println("=======客户端=======");

        new NClient().talk();

    }

    private void talk() throws IOException {
          Scanner scan=new Scanner(System.in);
          new MyThread().start();
          while(scan.hasNextLine()){  
              //读取键盘的输入  
              String line=scan.nextLine();  
              //将键盘的内容输出到SocketChanenel中  
              socketChannel.write(charse.encode(line));  
            System.out.println("#Client:"+line);
          }  


    }

    class MyThread extends Thread{

        public void run() {
            // TODO Auto-generated method stub
            try {
                while (selector.select() > 0) {

                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> it = keys.iterator();
                    while (it.hasNext()) {

                        key = it.next();
                        it.remove();

                        // TODO Auto-generated method stub
                        if (key.isReadable()) {
                            try {
                                receiveMsg(key);
                            } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        }

                    }
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }

    }
    private void receiveMsg(SelectionKey key) throws IOException {
        // TODO Auto-generated method stub
        // 获取该key所对应的SocketChannel
        SocketChannel socketChannel = (SocketChannel) key.channel();
        // 创建一个ByteBuffer用于存放读取的数据
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        String content = "";
        try {
            while (socketChannel.read(buffer) > 0) {
                // 将极限设置为当前位置,再将当前位置设置为0
                buffer.flip();
                // 对读取取内容进行解码
                content += charse.decode(buffer);
            }
            System.out.println(content);
            // 将key对应的Channel设置为准备下一次读取
            key.interestOps(SelectionKey.OP_READ);

        } catch (IOException e) {
            if (key != null) {
                key.cancel();
                key.channel().close();
            }
        }
    }
}

水平太低啦,大家不要介意,努力学习ing!

以上是关于NIO实现TCP的非阻塞通信的主要内容,如果未能解决你的问题,请参考以下文章

4.NIO的非阻塞式网络通信

Java TCP/IP Socket基于NIO的TCP通信(含代码)

9NIO--阻塞式

Netty介绍及应用

14.5 基于TCP协议的网络编程2——非阻塞的网络编程

14.5 基于TCP协议的网络编程2——非阻塞的网络编程