[Java] 非阻塞IO
Posted arseneyao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Java] 非阻塞IO相关的知识,希望对你有一定的参考价值。
一、概述
非阻塞式IO的主要组成部分为Channel、Buffer和Selector。
通道可以向缓冲区写入数据,也可以从缓冲区读取数据。选择器允许单线程处理多个通道。
二、通道
通道类似流。不同之处在于通道是双向的、可异步读写、必须经过缓冲区。
主要的通道实现有
- FileChannel:从文件读写数据。
- DatagramChannel:通过UDP读写网络中的数据。
- SocketChannel:通过TCP读写网络中的数据。
- ServerSocketChannel:监听新的TCP连接并自动创建SocketChannel。
import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; public class Solution { public static void main(String[] args) { try (RandomAccessFile file = new RandomAccessFile("/home/user/nio_test.txt", "rw")) { FileChannel inChannel = file.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(4096); int bytesRead; while ((bytesRead = inChannel.read(buffer)) != -1) { System.out.println("read: " + bytesRead); buffer.flip(); while (buffer.hasRemaining()) { System.out.print((char) buffer.get()); } buffer.clear(); } } catch (IOException exc) { exc.printStackTrace(); } } }
三、缓冲区
使用缓冲区的一般步骤:写入数据 -> 调用flip -> 读取数据 -> 调用clear或调用compact。
缓冲区的本质是一块被封装的可读写内存(其实就是数组),其部分重要属性如下:
- capacity:内存的容量,当内存不足以继续写入数据时必须进行清除
- position:写数据时表示当前位置,初始值为0,最大值为容量-1,写入后移动到下一个位置;读数据时被重置为0,读取后移动到下个位置。
- limit:写模式下表示可写入数据的最大容量;读模式下表示能读取数据的最大容量。
主要的缓冲区实现有ByteBuffer、MappedByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer。
通过调用allocate获取指定大小的缓冲区实例,参数代表缓冲区可存储的该Buffer类型数据的数量。
部分方法说明如下:
- flip:将缓冲区从写模式切换至读模式,并将limit设为position、position设为0。
- rewind:将position设为0、limit保持不变,可重新读取缓冲区的数据。
- clear:将position设为0、limit设为capacity,相当于清空缓冲区。
- compact:将所有未读数据拷贝至缓冲区起始处,将position设置到最后一个未读数据后、limit设置为capacity,可写入数据且不覆盖未读数据。
- mark/reset:使用mark标记特定的position,之后使用reset回到mark所标记的position。
四、分散与聚集
分散(Scatter)指进行读操作时将数据写入多个缓冲区(当前缓冲区填满后才会写入下个缓冲区),代码如下:
import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.List; public class Solution { public static void main(String[] args) { try (RandomAccessFile file = new RandomAccessFile("/home/user/nio_test.txt", "r")) { ByteBuffer buffer1 = ByteBuffer.allocate(64); ByteBuffer buffer2 = ByteBuffer.allocate(64); ByteBuffer[] buffers = {buffer1, buffer2}; FileChannel channel = file.getChannel(); List<Long> list = new ArrayList<>(); long bytesRead; while ((bytesRead = channel.read(buffers)) != -1) { list.add(bytesRead); for (ByteBuffer buffer : buffers) { buffer.flip(); while (buffer.hasRemaining()) { System.out.print((char) buffer.get()); } buffer.clear(); } } System.out.println("read process: " + list.toString()); } catch (IOException exc) { exc.printStackTrace(); } } }
聚集(Gather)指进行写操作时将多个缓冲区的数据写入同一通道(当前缓冲区读取完毕才会读取下个缓冲区),代码如下:
import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.util.Arrays; public class Solution { public static void main(String[] args) { try (RandomAccessFile file = new RandomAccessFile("/home/user/nio_test.txt", "rw")) { ByteBuffer buffer1 = ByteBuffer.allocate(32); ByteBuffer buffer2 = ByteBuffer.allocate(32); ByteBuffer[] buffers = {buffer1, buffer2}; byte[] array = "Hello World!".getBytes(StandardCharsets.UTF_8); System.out.println(Arrays.toString(array)); for (byte c : array) { buffer1.put(c); buffer2.put(c); } buffer1.flip(); buffer2.flip(); FileChannel outChannel = file.getChannel(); outChannel.write(buffers); } catch (IOException exc) { exc.printStackTrace(); } } }
五、通道数据传输
如果两个通道中其中有一个是FileChannel,那么就可以将数据从一个通道传输到另一个通道。
FileChannel.transferForm可以将数据从源通道传输到FileChannel中。
FileChannel.transferTo可以将FileChannel的数据传输到其他通道去。
import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; public class Solution { public static void main(String[] args) { try { RandomAccessFile src = new RandomAccessFile("/home/user/test1.txt", "r"); RandomAccessFile dst1 = new RandomAccessFile("/home/user/test2.txt", "rw"); RandomAccessFile dst2 = new RandomAccessFile("/home/user/test3.txt", "rw"); FileChannel inChannel = src.getChannel(); FileChannel outChannel1 = dst1.getChannel(); FileChannel outChannel2 = dst2.getChannel(); long position = 0;//写入初始位置 long count = inChannel.size();//申请字节数量 outChannel1.transferFrom(inChannel, position, count); inChannel.transferTo(position, count, outChannel2); } catch (IOException exc) { exc.printStackTrace(); } } }
六、选择器
选择器(Selector)可以检测多个通道,并监听通道是否对特定事件的做好准备。使用该组件可以实现单线程管理多通道,减少大量线程带来的资源占用。
通过调用工厂方法Selector.open获取选择器实例,使用Channel.register方法来注册选择器和指定其监听的一个或多个事件。
选择器可以监听Connect、Accept、Read和Write四种不同类型的事件,这四种事件的常量通过SelectionKey获取。
当注册完成后register方法会返回一个SelectionKey对象,该对象包含了interestOps、readyOps、Channel、Selector和附加对象。其中interestOps表示监听事件的集合、readyOps集合表示监听事件是否发生的集合。附加对象是用于标识选择器的可选属性。
import java.io.IOException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; class Solution { public static void main(String[] args) { try { SelectableChannel channel = SocketChannel.open(); Selector selector = Selector.open(); channel.configureBlocking(false); int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE; String attachedObject = "Hello!"; SelectionKey selectionKey = channel.register(selector, interestSet, attachedObject); interestSet = selectionKey.interestOps(); boolean isAcceptInterested = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT; boolean isConnectInterested = (interestSet & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT; boolean isReadInterested = (interestSet & SelectionKey.OP_READ) == SelectionKey.OP_READ; boolean isWriteInterested = (interestSet & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE; System.out.println("interested in accept: " + isAcceptInterested); System.out.println("interested in connect: " + isConnectInterested); System.out.println("interested in read: " + isReadInterested); System.out.println("interested in write: " + isWriteInterested); boolean isAcceptable = selectionKey.isAcceptable(); boolean isConnectable = selectionKey.isConnectable(); boolean isReadable = selectionKey.isReadable(); boolean isWritable = selectionKey.isWritable(); System.out.println("acceptable: " + isAcceptable); System.out.println("connectable: " + isConnectable); System.out.println("readable: " + isReadable); System.out.println("writable: " + isWritable); attachedObject = (String) selectionKey.attachment(); System.out.println("attached object: " + attachedObject); } catch (IOException e) { e.printStackTrace(); } } }
一旦向选择器注册了一个或多个通道,就可以调用select方法获取已经准备就绪的通道数量。select()会阻塞直至有注册通道的事件就绪,select(long timeout)会阻塞指定时间后返回,selectNow()会立刻返回。当select的返回值表明已经有通道就绪,就可以调用Selector.selectionKeys方法来获取就绪通道的SelectionKey集合。
import java.io.IOException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; class Solution { public static void main(String[] args) { try (Selector selector = Selector.open()) { SelectableChannel channel = SocketChannel.open(); channel.configureBlocking(false); int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE; channel.register(selector, interestSet); if (selector.select(1000) > 0) { Set<SelectionKey> set = selector.selectedKeys(); Iterator<SelectionKey> iterator = set.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { System.out.println(key.channel() + " is accepted"); } else if (key.isConnectable()) { System.out.println(key.channel() + " is connectable"); } else if (key.isReadable()) { System.out.println(key.channel() + " is readable"); } else if (key.isWritable()) { System.out.println(key.channel() + " is writable"); } iterator.remove(); } } else { System.out.println("no ready channels"); } } catch (IOException e) { e.printStackTrace(); } } }
七、通道实现
FileChannel是连接到文件的通道,它总是运行在阻塞模式下。
FileChannel的实例可通过InputStream、OutputStream或RandAccessFile来获取,之后便可读写数据了。
另外truncate可截取指定长度的文件内容并删除指定长度后的部分、force可强制将缓冲区中的数据写入硬盘。
import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; class Solution { public static void main(String[] args) { try (RandomAccessFile file = new RandomAccessFile("/home/user/Desktop/data.txt", "rw")) { FileChannel channel = file.getChannel(); channel.force(true); ByteBuffer buffer = ByteBuffer.allocate(64); buffer.put("c++ is the worst language in the world! ".getBytes(StandardCharsets.UTF_8)); buffer.flip(); channel.write(buffer); channel.position(0); buffer.clear(); buffer.put("java is the best language in the world! ".getBytes(StandardCharsets.UTF_8)); buffer.flip(); channel.write(buffer); System.out.println("file size before truncate: " + channel.size()); channel.truncate(32); System.out.println("file size after truncate: " + channel.size()); } catch (IOException e) { e.printStackTrace(); } } }
SocketChannel是连接到TCP套接字的通道,获取SocketChannel的方式有:
- 打开SocketChannel并连接到网络服务器。
- 新连接到达ServerSocketChannel后自动创建。
当设置SocketChannel为非阻塞模式之后,就可以异步调用connect、read和write了。在异步模式下,connect可能在连接建立前就返回了,需要调用finishConnect来确定连接是否建立完成;read可能在未读取到数据时就返回了,write可能在未写出数据时就返回了,需要对返回的字节数进行判断。
ServerSocketChannel可通过accept方法监听TCP连接,默认状态下当有新的连接时该方法会返回SocketChannel实例。在异步模式下,accept的返回值可能是SocketChannel对象或null。
//Server.java import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; public class Server { public static void main(String[] args) { try (ServerSocketChannel serverChannel = ServerSocketChannel.open()) { serverChannel.socket().bind(new InetSocketAddress(10086)); SocketChannel clientChannel; while ((clientChannel = serverChannel.accept()) != null) { ByteBuffer buffer = ByteBuffer.allocate(1024); int bytesRead = clientChannel.read(buffer); buffer.flip(); System.out.print("read: "); while (buffer.hasRemaining()) { System.out.print((char) buffer.get()); } System.out.println(); String content = Integer.toString(bytesRead); buffer.clear(); buffer.put(content.getBytes()); buffer.flip(); clientChannel.write(buffer); System.out.println("write: " + content); buffer.clear(); } } catch (IOException e) { e.printStackTrace(); } } } //Client.java import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Scanner; class Client { public static void main(String[] args) { try (SocketChannel channel = SocketChannel.open()) { channel.configureBlocking(false); channel.connect(new InetSocketAddress("127.0.0.1", 10086)); while (!channel.finishConnect()) { System.out.println("waiting for connecting..."); Thread.sleep(200); } ByteBuffer buffer = ByteBuffer.allocate(1024); Scanner scanner = new Scanner(System.in); String content = scanner.nextLine(); byte[] bytes = content.getBytes(); buffer.put(bytes); buffer.flip(); while (channel.write(buffer) == 0) { System.out.println("waiting for writing..."); Thread.sleep(200); } buffer.clear(); System.out.println("write: " + content); while (channel.read(buffer) == 0) { System.out.println("waiting for reading..."); Thread.sleep(200); } buffer.flip(); System.out.print("read: "); while (buffer.hasRemaining()) { System.out.print((char) buffer.get()); } System.out.println(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } } }
DatagramChannel是可收发UDP包的通道。由于UDP是无协议网络,所以不能像其他通道那样读写。
//Server.java import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; public class Server { public static void main(String[] args) { try (DatagramChannel channel = DatagramChannel.open()) { ByteBuffer buffer = ByteBuffer.allocate(256); String content = "Hello UDP!"; buffer.put(content.getBytes()); buffer.flip(); channel.send(buffer, new InetSocketAddress("127.0.0.1", 10086)); } catch (IOException e) { e.printStackTrace(); } } } //Client.java import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; class Client { public static void main(String[] args) { try (DatagramChannel channel = DatagramChannel.open()) { channel.socket().bind(new InetSocketAddress("127.0.0.1", 10086)); ByteBuffer buffer = ByteBuffer.allocate(256); channel.receive(buffer);//blocking before receiving data buffer.flip(); while (buffer.hasRemaining()) { System.out.print((char) buffer.get()); } } catch (IOException e) { e.printStackTrace(); } } }
以上是关于[Java] 非阻塞IO的主要内容,如果未能解决你的问题,请参考以下文章
Java网络编程——NIO的阻塞IO模式非阻塞IO模式IO多路复用模式的使用