Java-NIO 之 Selector 与 Pipe

Posted jhxxb

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java-NIO 之 Selector 与 Pipe相关的知识,希望对你有一定的参考价值。

关于阻塞与非阻塞:https://www.cnblogs.com/jhxxb/p/11272727.html

 

一、传统的 IO 流都是阻塞式的

当一个线程调用 read() 或 write() 时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务。

因此,在网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降。

技术图片
package nio;

import org.junit.Test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Scanner;


public class TestBlockingNIO 

    // 客户端
    @Test
    public void client() throws IOException 
        // 获取通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));

        // 分配指定大小的缓冲区
        ByteBuffer buf = ByteBuffer.allocate(1024);

        // 发送到服务端
        Scanner scan = new Scanner(System.in);
        while (scan.hasNext()) 
            String str = scan.next();
            buf.put(str.getBytes());
            buf.flip();
            sChannel.write(buf);
            buf.clear();

            if ("exit".equals(str)) 
                break;
            
        


        // 接收服务端的反馈
        sChannel.shutdownOutput();
        int len = 0;
        while ((len = sChannel.read(buf)) != -1) 
            buf.flip();
            System.out.println(new String(buf.array(), 0, len));
            buf.clear();
        

        // 关闭通道
        sChannel.close();
    

    // 服务端
    @Test
    public void server() throws IOException 
        // 获取通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();

        // 绑定连接
        ssChannel.bind(new InetSocketAddress(9898));

        retry:
        while (true) 
            // 获取客户端连接的通道
            SocketChannel sChannel = ssChannel.accept();

            // 分配指定大小的缓冲区
            ByteBuffer buf = ByteBuffer.allocate(1024);

            // 接收客户端的数据
            while (sChannel.read(buf) != -1) 
                String str = new String(buf.array()).trim();
                if ("exit".equals(str)) 
                    break retry;
                
                buf.flip();
                System.out.println(str);
                buf.clear();
            

            // 发送反馈给客户端
            buf.put("服务端接收数据成功!".getBytes());
            buf.flip();
            sChannel.write(buf);

            // 关闭通道
            sChannel.close();
        

        // 关闭通道
        ssChannel.close();
    
View Code

 

二、Java NIO 是非阻塞式的

当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。

线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。

因此,NIO 可以让服务器端使用一个或有限几个线程来同时处理连接到服务器端的所有客户端。

注:NIO 的 IO 行为还是同步的。

/*
 * 使用 NIO 完成网络通信的三个核心:
 *
 * 1. 通道(Channel):负责连接
 *        java.nio.channels.Channel 接口:
 *             |--SelectableChannel
 *                 |--SocketChannel
 *                 |--ServerSocketChannel
 *                 |--DatagramChannel
 *
 *                 |--Pipe.SinkChannel
 *                 |--Pipe.SourceChannel
 *
 * 2. 缓冲区(Buffer):负责数据的存取
 *
 * 3. 选择器(Selector):是 SelectableChannel 的多路复用器。用于监控 SelectableChannel 的 IO 状况
 * 可以监听的事件类型(可使用 SelectionKey 的四个常量表示)
 *     读: SelectionKey.OP_READ  (1)
 *     写: SelectionKey.OP_WRITE    (4)
 *     连接: SelectionKey.OP_CONNECT(8)
 *     接收: SelectionKey.OP_ACCEPT  (16)
 *
 * Selector 常用方法
 * Set<SelectionKey> keys():所有的 SelectionKey 集合。代表注册在该 Selector上的 Channel
 * selectedKeys():被选择的 SelectionKey 集合。返回此Selector的已选择键集
 * intselect():监控所有注册的 Channel,当它们中间有需要处理的 IO 操作时,该方法返回,并将对应得的 SelectionKey 加入被选择的 SelectionKey 集合中,该方法返回这些 Channel 的数量。
 * int select(long timeout):可以设置超时时长的 select() 操作
 * int selectNow():执行一个立即返回的 select() 操作,该方法不会阻塞线程
 * Selector wakeup():使一个还未返回的 select() 方法立即返回
 * void close():关闭该选择器
 */

1.TCP-SocketChannel

技术图片
package nio;

import org.junit.Test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;

public class TestNonBlockingNIO 

    //客户端
    @Test
    public void client() throws IOException 
        // 获取通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));

        // 切换非阻塞模式
        sChannel.configureBlocking(false);

        // 分配指定大小的缓冲区
        ByteBuffer buf = ByteBuffer.allocate(1024);

        // 发送数据给服务端
        Scanner scan = new Scanner(System.in);

        while (scan.hasNext()) 
            String str = scan.next();
            buf.put((new Date().toString() + "\\n" + str).getBytes());
            buf.flip();
            sChannel.write(buf);
            buf.clear();
        

        // 关闭通道
        sChannel.close();
    

    //服务端
    @Test
    public void server() throws IOException 
        // 获取通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();

        // 切换非阻塞模式
        ssChannel.configureBlocking(false);

        // 绑定连接
        ssChannel.bind(new InetSocketAddress(9898));

        // 获取选择器
        Selector selector = Selector.open();

        // 将通道注册到选择器上, 并且指定“监听接收事件”
        ssChannel.register(selector, SelectionKey.OP_ACCEPT | SelectionKey.OP_READ);

        // 轮询式的获取选择器上已经“准备就绪”的事件
        while (selector.select() > 0) 

            // 获取当前选择器中所有注册的“选择键(已就绪的监听事件)”
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();

            while (it.hasNext()) 
                // 获取准备“就绪”的是事件
                SelectionKey sk = it.next();

                // 判断具体是什么事件准备就绪
                if (sk.isAcceptable()) 
                    // 若“接收就绪”,获取客户端连接
                    SocketChannel sChannel = ssChannel.accept();

                    // 切换非阻塞模式
                    sChannel.configureBlocking(false);

                    // 将该通道注册到选择器上
                    sChannel.register(selector, SelectionKey.OP_READ);
                 else if (sk.isReadable()) 
                    // 获取当前选择器上“读就绪”状态的通道
                    SocketChannel sChannel = (SocketChannel) sk.channel();

                    // 读取数据
                    ByteBuffer buf = ByteBuffer.allocate(1024);

                    int len = 0;
                    while ((len = sChannel.read(buf)) > 0) 
                        buf.flip();
                        System.out.println(new String(buf.array(), 0, len));
                        buf.clear();
                    
                

                // 移除当前 SelectionKey
                it.remove();
            
        
    
View Code

2.UDP-DatagramChannel

技术图片
package nio;

import org.junit.Test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;

public class TestNonBlockingNIO2 

    @Test
    public void send() throws IOException
        DatagramChannel dc = DatagramChannel.open();

        dc.configureBlocking(false);

        ByteBuffer buf = ByteBuffer.allocate(1024);

        Scanner scan = new Scanner(System.in);

        while(scan.hasNext())
            String str = scan.next();
            buf.put((new Date().toString() + ":\\n" + str).getBytes());
            buf.flip();
            dc.send(buf, new InetSocketAddress("127.0.0.1", 9898));
            buf.clear();
        

        dc.close();
    

    @Test
    public void receive() throws IOException
        DatagramChannel dc = DatagramChannel.open();

        dc.configureBlocking(false);

        dc.bind(new InetSocketAddress(9898));

        Selector selector = Selector.open();

        dc.register(selector, SelectionKey.OP_READ);

        while(selector.select() > 0)
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();

            while(it.hasNext())
                SelectionKey sk = it.next();

                if(sk.isReadable())
                    ByteBuffer buf = ByteBuffer.allocate(1024);

                    dc.receive(buf);
                    buf.flip();
                    System.out.println(new String(buf.array(), 0, buf.limit()));
                    buf.clear();
                
            

            it.remove();
        
    
View Code

 

三、Pipe(管道)

Java NIO 管道是 2 个线程之间的单向数据连接。Pipe 有一个 source 通道和一个 sink 通道。数据会被写到 sink 通道,从 source 通道读取。

技术图片
package nio;

import org.junit.Test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;

public class TestPipe 

    @Test
    public void test() throws IOException 
        // 获取管道
        Pipe pipe = Pipe.open();

        ByteBuffer buf = ByteBuffer.allocate(1024);
        buf.put("通过单向管道发送数据".getBytes());
        buf.flip();

        // 将缓冲区中的数据写入管道
        Pipe.SinkChannel sinkChannel = pipe.sink();
        sinkChannel.write(buf);


        ByteBuffer buf2 = ByteBuffer.allocate(1024);
        // 读取缓冲区中的数据
        Pipe.SourceChannel sourceChannel = pipe.source();
        int len = sourceChannel.read(buf2);
        System.out.println(new String(buf2.array(), 0, len));

        sourceChannel.close();
        sinkChannel.close();
    
View Code

技术图片

 


https://mp.weixin.qq.com/s?__biz=Mzg3MjA4MTExMw==&mid=2247484746&idx=1&sn=c0a7f9129d780786cabfcac0a8aa6bb7

以上是关于Java-NIO 之 Selector 与 Pipe的主要内容,如果未能解决你的问题,请参考以下文章

Java-NIO 之 Buffer 与 Channel

Java-NIO:通道(Channel)的原理与获取

Java-NIO:简介

Java-NIO:Channel聚集(gather)写入与分散(scatter)读取

Java-NIO:直接缓冲区与非直接缓冲区

Java-NIO:缓冲区(Buffer)的数据存取