[Java] 非阻塞IO

Posted arseneyao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Java] 非阻塞IO相关的知识,希望对你有一定的参考价值。

一、概述

非阻塞式IO的主要组成部分为Channel、Buffer和Selector。

通道可以向缓冲区写入数据,也可以从缓冲区读取数据。选择器允许单线程处理多个通道。

 

二、通道

通道类似流。不同之处在于通道是双向的、可异步读写、必须经过缓冲区。

主要的通道实现有

  • FileChannel:从文件读写数据。
  • DatagramChannel:通过UDP读写网络中的数据。
  • SocketChannel:通过TCP读写网络中的数据。
  • ServerSocketChannel:监听新的TCP连接并自动创建SocketChannel。
技术分享图片
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class Solution {
    public static void main(String[] args) {
        try (RandomAccessFile file = new RandomAccessFile("/home/user/nio_test.txt", "rw")) {
            FileChannel inChannel = file.getChannel();

            ByteBuffer buffer = ByteBuffer.allocate(4096);
            int bytesRead;
            while ((bytesRead = inChannel.read(buffer)) != -1) {
                System.out.println("read: " + bytesRead);
                buffer.flip();
                while (buffer.hasRemaining()) {
                    System.out.print((char) buffer.get());
                }
                buffer.clear();
            }
        } catch (IOException exc) {
            exc.printStackTrace();
        }
    }
}
View Code

 

三、缓冲区

使用缓冲区的一般步骤:写入数据 -> 调用flip -> 读取数据 -> 调用clear或调用compact。

缓冲区的本质是一块被封装的可读写内存(其实就是数组),其部分重要属性如下:

  • capacity:内存的容量,当内存不足以继续写入数据时必须进行清除
  • position:写数据时表示当前位置,初始值为0,最大值为容量-1,写入后移动到下一个位置;读数据时被重置为0,读取后移动到下个位置。
  • limit:写模式下表示可写入数据的最大容量;读模式下表示能读取数据的最大容量。

主要的缓冲区实现有ByteBuffer、MappedByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer。

通过调用allocate获取指定大小的缓冲区实例,参数代表缓冲区可存储的该Buffer类型数据的数量。

部分方法说明如下:

  • flip:将缓冲区从写模式切换至读模式,并将limit设为position、position设为0。
  • rewind:将position设为0、limit保持不变,可重新读取缓冲区的数据。
  • clear:将position设为0、limit设为capacity,相当于清空缓冲区。
  • compact:将所有未读数据拷贝至缓冲区起始处,将position设置到最后一个未读数据后、limit设置为capacity,可写入数据且不覆盖未读数据。
  • mark/reset:使用mark标记特定的position,之后使用reset回到mark所标记的position。

 

四、分散与聚集

分散(Scatter)指进行读操作时将数据写入多个缓冲区(当前缓冲区填满后才会写入下个缓冲区),代码如下:

技术分享图片
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;

public class Solution {
    public static void main(String[] args) {
        try (RandomAccessFile file = new RandomAccessFile("/home/user/nio_test.txt", "r")) {
            ByteBuffer buffer1 = ByteBuffer.allocate(64);
            ByteBuffer buffer2 = ByteBuffer.allocate(64);
            ByteBuffer[] buffers = {buffer1, buffer2};

            FileChannel channel = file.getChannel();
            List<Long> list = new ArrayList<>();
            long bytesRead;
            while ((bytesRead = channel.read(buffers)) != -1) {
                list.add(bytesRead);

                for (ByteBuffer buffer : buffers) {
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        System.out.print((char) buffer.get());
                    }
                    buffer.clear();
                }
            }
            System.out.println("read process: " + list.toString());
        } catch (IOException exc) {
            exc.printStackTrace();
        }


    }
}
View Code

聚集(Gather)指进行写操作时将多个缓冲区的数据写入同一通道(当前缓冲区读取完毕才会读取下个缓冲区),代码如下:

技术分享图片
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

public class Solution {
    public static void main(String[] args) {
        try (RandomAccessFile file = new RandomAccessFile("/home/user/nio_test.txt", "rw")) {
            ByteBuffer buffer1 = ByteBuffer.allocate(32);
            ByteBuffer buffer2 = ByteBuffer.allocate(32);
            ByteBuffer[] buffers = {buffer1, buffer2};
            byte[] array = "Hello World!".getBytes(StandardCharsets.UTF_8);
            System.out.println(Arrays.toString(array));
            for (byte c : array) {
                buffer1.put(c);
                buffer2.put(c);
            }
            buffer1.flip();
            buffer2.flip();

            FileChannel outChannel = file.getChannel();
            outChannel.write(buffers);
        } catch (IOException exc) {
            exc.printStackTrace();
        }
    }
}
View Code

 

五、通道数据传输

如果两个通道中其中有一个是FileChannel,那么就可以将数据从一个通道传输到另一个通道。

FileChannel.transferForm可以将数据从源通道传输到FileChannel中。

FileChannel.transferTo可以将FileChannel的数据传输到其他通道去。

技术分享图片
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;

public class Solution {
    public static void main(String[] args) {
        try {
            RandomAccessFile src = new RandomAccessFile("/home/user/test1.txt", "r");
            RandomAccessFile dst1 = new RandomAccessFile("/home/user/test2.txt", "rw");
            RandomAccessFile dst2 = new RandomAccessFile("/home/user/test3.txt", "rw");

            FileChannel inChannel = src.getChannel();
            FileChannel outChannel1 = dst1.getChannel();
            FileChannel outChannel2 = dst2.getChannel();

            long position = 0;//写入初始位置
            long count = inChannel.size();//申请字节数量

            outChannel1.transferFrom(inChannel, position, count);
            inChannel.transferTo(position, count, outChannel2);
        } catch (IOException exc) {
            exc.printStackTrace();
        }
    }
}
View Code

 

六、选择器

选择器(Selector)可以检测多个通道,并监听通道是否对特定事件的做好准备。使用该组件可以实现单线程管理多通道,减少大量线程带来的资源占用。

通过调用工厂方法Selector.open获取选择器实例,使用Channel.register方法来注册选择器和指定其监听的一个或多个事件。

选择器可以监听Connect、Accept、Read和Write四种不同类型的事件,这四种事件的常量通过SelectionKey获取。

当注册完成后register方法会返回一个SelectionKey对象,该对象包含了interestOps、readyOps、Channel、Selector和附加对象。其中interestOps表示监听事件的集合、readyOps集合表示监听事件是否发生的集合。附加对象是用于标识选择器的可选属性。

技术分享图片
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

class Solution {
    public static void main(String[] args) {
        try {
            SelectableChannel channel = SocketChannel.open();
            Selector selector = Selector.open();

            channel.configureBlocking(false);
            int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
            String attachedObject = "Hello!";
            SelectionKey selectionKey = channel.register(selector, interestSet, attachedObject);

            interestSet = selectionKey.interestOps();
            boolean isAcceptInterested = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
            boolean isConnectInterested = (interestSet & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT;
            boolean isReadInterested = (interestSet & SelectionKey.OP_READ) == SelectionKey.OP_READ;
            boolean isWriteInterested = (interestSet & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE;
            System.out.println("interested in accept: " + isAcceptInterested);
            System.out.println("interested in connect: " + isConnectInterested);
            System.out.println("interested in read: " + isReadInterested);
            System.out.println("interested in write: " + isWriteInterested);
            
            boolean isAcceptable = selectionKey.isAcceptable();
            boolean isConnectable = selectionKey.isConnectable();
            boolean isReadable = selectionKey.isReadable();
            boolean isWritable = selectionKey.isWritable();
            System.out.println("acceptable: " + isAcceptable);
            System.out.println("connectable: " + isConnectable);
            System.out.println("readable: " + isReadable);
            System.out.println("writable: " + isWritable);

            attachedObject = (String) selectionKey.attachment();
            System.out.println("attached object: " + attachedObject);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
View Code

一旦向选择器注册了一个或多个通道,就可以调用select方法获取已经准备就绪的通道数量。select()会阻塞直至有注册通道的事件就绪,select(long timeout)会阻塞指定时间后返回,selectNow()会立刻返回。当select的返回值表明已经有通道就绪,就可以调用Selector.selectionKeys方法来获取就绪通道的SelectionKey集合。

技术分享图片
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

class Solution {
    public static void main(String[] args) {
        try (Selector selector = Selector.open()) {
            SelectableChannel channel = SocketChannel.open();

            channel.configureBlocking(false);
            int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
            channel.register(selector, interestSet);

            if (selector.select(1000) > 0) {
                Set<SelectionKey> set = selector.selectedKeys();
                Iterator<SelectionKey> iterator = set.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    if (key.isAcceptable()) {
                        System.out.println(key.channel() + " is accepted");
                    } else if (key.isConnectable()) {
                        System.out.println(key.channel() + " is connectable");
                    } else if (key.isReadable()) {
                        System.out.println(key.channel() + " is readable");
                    } else if (key.isWritable()) {
                        System.out.println(key.channel() + " is writable");
                    }
                    iterator.remove();
                }
            } else {
                System.out.println("no ready channels");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
View Code

 

七、通道实现

FileChannel是连接到文件的通道,它总是运行在阻塞模式下。

FileChannel的实例可通过InputStream、OutputStream或RandAccessFile来获取,之后便可读写数据了。

另外truncate可截取指定长度的文件内容并删除指定长度后的部分、force可强制将缓冲区中的数据写入硬盘。

技术分享图片
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;

class Solution {
    public static void main(String[] args) {
        try (RandomAccessFile file = new RandomAccessFile("/home/user/Desktop/data.txt", "rw")) {
            FileChannel channel = file.getChannel();
            channel.force(true);

            ByteBuffer buffer = ByteBuffer.allocate(64);
            buffer.put("c++ is the worst language in the world!
".getBytes(StandardCharsets.UTF_8));
            buffer.flip();
            channel.write(buffer);

            channel.position(0);
            buffer.clear();
            buffer.put("java is the best language in the world!
".getBytes(StandardCharsets.UTF_8));
            buffer.flip();
            channel.write(buffer);

            System.out.println("file size before truncate: " + channel.size());
            channel.truncate(32);
            System.out.println("file size after truncate: " + channel.size());

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
View Code

SocketChannel是连接到TCP套接字的通道,获取SocketChannel的方式有:

  1. 打开SocketChannel并连接到网络服务器。
  2. 新连接到达ServerSocketChannel后自动创建。

当设置SocketChannel为非阻塞模式之后,就可以异步调用connect、read和write了。在异步模式下,connect可能在连接建立前就返回了,需要调用finishConnect来确定连接是否建立完成;read可能在未读取到数据时就返回了,write可能在未写出数据时就返回了,需要对返回的字节数进行判断。

ServerSocketChannel可通过accept方法监听TCP连接,默认状态下当有新的连接时该方法会返回SocketChannel实例。在异步模式下,accept的返回值可能是SocketChannel对象或null。

技术分享图片
//Server.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class Server {
    public static void main(String[] args) {
        try (ServerSocketChannel serverChannel = ServerSocketChannel.open()) {
            serverChannel.socket().bind(new InetSocketAddress(10086));
            SocketChannel clientChannel;
            while ((clientChannel = serverChannel.accept()) != null) {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int bytesRead = clientChannel.read(buffer);
                buffer.flip();
                System.out.print("read: ");
                while (buffer.hasRemaining()) {
                    System.out.print((char) buffer.get());
                }
                System.out.println();

                String content = Integer.toString(bytesRead);
                buffer.clear();
                buffer.put(content.getBytes());
                buffer.flip();
                clientChannel.write(buffer);
                System.out.println("write: " + content);
                buffer.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}



//Client.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

class Client {
    public static void main(String[] args) {
        try (SocketChannel channel = SocketChannel.open()) {
            channel.configureBlocking(false);
            channel.connect(new InetSocketAddress("127.0.0.1", 10086));

            while (!channel.finishConnect()) {
                System.out.println("waiting for connecting...");
                Thread.sleep(200);
            }

            ByteBuffer buffer = ByteBuffer.allocate(1024);
            Scanner scanner = new Scanner(System.in);
            String content = scanner.nextLine();
            byte[] bytes = content.getBytes();
            buffer.put(bytes);
            buffer.flip();
            while (channel.write(buffer) == 0) {
                System.out.println("waiting for writing...");
                Thread.sleep(200);
            }
            buffer.clear();
            System.out.println("write: " + content);

            while (channel.read(buffer) == 0) {
                System.out.println("waiting for reading...");
                Thread.sleep(200);
            }
            buffer.flip();
            System.out.print("read: ");
            while (buffer.hasRemaining()) {
                System.out.print((char) buffer.get());
            }
            System.out.println();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}
View Code

DatagramChannel是可收发UDP包的通道。由于UDP是无协议网络,所以不能像其他通道那样读写。

技术分享图片
//Server.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;

public class Server {
    public static void main(String[] args) {
        try (DatagramChannel channel = DatagramChannel.open()) {
            ByteBuffer buffer = ByteBuffer.allocate(256);
            String content = "Hello UDP!";
            buffer.put(content.getBytes());
            buffer.flip();
            channel.send(buffer, new InetSocketAddress("127.0.0.1", 10086));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}



//Client.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;

class Client {
    public static void main(String[] args) {
        try (DatagramChannel channel = DatagramChannel.open()) {
            channel.socket().bind(new InetSocketAddress("127.0.0.1", 10086));

            ByteBuffer buffer = ByteBuffer.allocate(256);
            channel.receive(buffer);//blocking before receiving data
            buffer.flip();
            while (buffer.hasRemaining()) {
                System.out.print((char) buffer.get());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
View Code

 

以上是关于[Java] 非阻塞IO的主要内容,如果未能解决你的问题,请参考以下文章

java同步非阻塞IO

java nio

非阻塞 IO 与异步 IO 以及 Java 中的实现

Java网络编程——NIO的阻塞IO模式非阻塞IO模式IO多路复用模式的使用

Java网络编程——NIO的阻塞IO模式非阻塞IO模式IO多路复用模式的使用

Java网络编程——NIO的阻塞IO模式非阻塞IO模式IO多路复用模式的使用