Java NIO
Posted 安小
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java NIO相关的知识,希望对你有一定的参考价值。
参考文章 http://www.cnblogs.com/geason/p/5774096.html
http://www.iteye.com/magazines/132-Java-NIO
阻塞I/O通信模型存在以下缺点:
1. 当客户端多时,会创建大量的处理线程。且每个线程都要占用栈空间和一些CPU时间
2. 阻塞可能带来频繁的上下文切换,且大部分上下文切换可能是无意义的。
同步非阻塞的NIO模型:
JDK 1. 4中新加入了 NIO( New Input/ Output) 类, 引入了一种基于通道和缓冲区的 I/O 方式。NIO 是一种同步非阻塞的 IO 模型。同步的核心就是 Selector,Selector 代替了线程本身轮询 IO 事件,避免了阻塞同时减少了不必要的线程消耗;非阻塞的核心就是通道和缓冲区,当 IO 事件就绪时,先将数据写入缓冲区,再由缓冲区交给线程,因此线程无需阻塞地等待IO。
1. 缓存
缓冲区本质上是一块可以读写数据的内存。这块内存被包装成NIO Buffer对象,使用 Buffer 读写数据一般遵循以下四个步骤:
- 写入数据到 Buffer;
- 调用 flip() 方法;
- 从 Buffer 中读取数据;
- 调用 clear() 方法或者 compact() 方法。
当向buffer写入数据时,buffer会记录下写了多少数据。一旦要读取数据,需要通过flip()方法将Buffer从写模式切换到读模式。在读模式下,可以读取之前写入到buffer的所有数据。一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。有两种方式能清空缓冲区:调用clear()或compact()方法。clear()方法会清空整个缓冲区。compact()方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。
NIO中的关键Buffer实现有:ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer,分别对应基本数据类型。
Buffer的三个属性(capacity,position,limit)
position和limit的含义取决于Buffer处在读模式还是写模式。不管Buffer处在什么模式,capacity的含义总是一样的。
(1) capacity
capacity表示Buffer的大小值。一旦Buffer满了,需要将其清空(通过读数据或者清除数据)才能继续写数据往里写数据。
(2) position
写模式下position表示当前的位置,初始值为0。当一个byte或long数据写到Buffer后, position会向前移动到下一个可插入数据的Buffer单元,最大可为capacity-1。
当将Buffer从写模式切换到读模式,position会被重置为0。当从Buffer的position处读取数据时,position向前移动到下一个可读的位置。
(3) limit
在写模式下,Buffer的limit表示你最多能往Buffer里写多少数据, 写模式下limit等于Buffer的capacity。
当切换Buffer到读模式时, limit会被设置成写模式下的position值,表示你最多能读到多少数据。换句话说,你能读到之前写入的所有数据。
2. 通道
通道和流的区别:
- Channel是双向的,既可以读又可以写,而流是单向的
- Channel可以进行异步的读写
- 对Channel的读写必须通过buffer对象
channel的实现 :
- FileChannel:从文件中读写数据。
- DatagramChannel:能通过UDP读写网络中的数据。
- SocketChannel:能通过TCP读写网络中的数据。
- ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。
与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以。
// FileChannel的实现 RandomAccessFile aFile = new RandomAccessFile("data.txt", "rw"); FileChannel inChannel = aFile.getChannel(); ByteBuffer buf = ByteBuffer.allocate(1024); int bytesRead = inChannel.read(buf); while (bytesRead != -1) { System.out.println("Read " + bytesRead); buf.flip(); // 将buffer切换为读模式 while(buf.hasRemaining()){ System.out.print((char) buf.get()); // 读取缓存 } buf.clear(); // 清空缓存 bytesRead = inChannel.read(buf); } aFile.close();
3. Selector
Selector用于监听多个通道的事件(如连接打开,数据到达),因此单个线程可以监听多个数据通道。Selector允许单线程处理多个 Channel,使得线程无需阻塞地等待IO事件的就绪。如果你的应用打开了多个连接,但每个连接的流量都很低,使用Selector就会很方便:向Selector注册Channel,然后调用它的select()方法,该方法会一直阻塞直到某个注册通道的事件就绪。一旦这个方法返回,线程就可以处理这些事件(如新连接进来,数据接收等)。
这是在一个单线程中使用一个Selector处理3个Channel的图示:
Selector中注册的感兴趣事件有:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
select()方法
- select()阻塞到至少有一个通道在你注册的事件上就绪了,返回的int值表示有多少通道已经就绪。
- select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。
- selectNow()不会阻塞,不管什么通道就绪都立刻返回,此方法执行非阻塞的选择操作。
Selector selector = Selector.open(); channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ); while(true) { int readyChannels = selector.select(); if(readyChannels == 0) continue; Set selectedKeys = selector.selectedKeys(); Iterator keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); // 手动移除SelectionKey实例 } }
SocketChannel服务端:
public class Nioserver { private Selector selector; public NioServer(int port) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port)); selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 将通道管理器与通道绑定,并为该通道注册感兴趣的事件 System.out.println("服务器端启动成功"); } public void process() throws IOException { while(true) { selector.select(); Iterator<SelectionKey> ite = selector.selectedKeys().iterator(); while(ite.hasNext()) { SelectionKey key = ite.next(); if(key.isAcceptable()) { // 2. 接受客户端连接请求,并发送消息 ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 获得客户端连接通道 SocketChannel channel = server.accept(); channel.configureBlocking(false); channel.write(ByteBuffer.wrap("send message to client".getBytes())); channel.register(selector, SelectionKey.OP_READ); System.out.println("客户端请求连接事件"); } else if(key.isReadable()) { // 3. 接收客户端消息 SocketChannel channel = (SocketChannel)key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int read = channel.read(buffer); String message = new String(buffer.array()); System.out.println("receive message from client, size:" + buffer.position() + ", msg: " + message); } ite.remove(); } } } public static void main(String[] args) throws IOException { new NioServer(9989).process(); } }
ServerSocketChannel是一个基于通道的socket监听器。它同我们所熟悉的java.net.ServerSocket执行相同的基本任务,不过它增加了通道语义,因此能够在非阻塞模式下运行。当没有传入连接在等待时,ServerSocketChannel.accept( )会立即返回null。以下是ServerSocketChannel的代码实现。
SocketChannel客户端:
public class NioClient { private Selector selector; public NioClient(String ip, int port) throws IOException { SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); channel.connect(new InetSocketAddress(ip, port)); selector = Selector.open(); channel.register(selector, SelectionKey.OP_CONNECT); } public void process() throws IOException { while(true) { selector.select(); Iterator<SelectionKey> ite = selector.selectedKeys().iterator(); while(ite.hasNext()) { SelectionKey key = ite.next(); if(key.isConnectable()) { // 1. 连接服务端,并发送消息 SocketChannel channel = (SocketChannel) key.channel(); if(channel.isConnectionPending()) channel.finishConnect(); channel.configureBlocking(false); channel.write(ByteBuffer.wrap("send message to server".getBytes())); channel.register(selector, SelectionKey.OP_READ); } else if(key.isReadable()) { // 4. 接收服务端的消息 SocketChannel channel = (SocketChannel)key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int read = channel.read(buffer); String message = new String(buffer.array()); System.out.println("receive message from server, size:" + buffer.position() + ", msg: " + message); } ite.remove(); } } } public static void main(String[] args) throws IOException { new NioClient("127.0.0.1", 9989).process(); } }
输出结果:
服务器端启动成功
客户端请求连接事件
receive message from client, size:22, msg: send message to server
receive message from server, size:22, msg: send message to client
Java NIO提供了与标准IO不同的工作方式:
面向流与面向缓冲
Java IO是面向流的,每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。
Java NIO是基于通道和缓冲区进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。这增加了处理过程中的灵活性。但是需要检查该缓冲区中是否包含所有数据,还需确保不要覆盖缓冲区里尚未处理的数据。
阻塞与非阻塞IO
Java IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。
Java NIO是非阻塞的,当一个线程从某通道发送请求读取数据,它仅能得到目前可用的数据(读),如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入(写),这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。
选择器(Selector)
Java NIO的选择器允许一个单独的线程来监视多个输入通道,你可以注册多个通道使用一个选择器,然后使用一个单独的线程来“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。
以服务端为例,如果服务端的selector上注册了读事件,阻塞I/O这时会调用read()方法阻塞地读取数据,而NIO的服务端的处理线程会轮询地访问selector,如果访问selector时发现有感兴趣的事件到达,则处理这些事件,如果没有感兴趣的事件到达,则处理线程会一直阻塞直到感兴趣的事件到达为止。
分散(Scatter)/ 聚集(Gather)
分散:将Channel中的数据读到多个buffer中
聚集:是指数据从多个buffer写入到同一个channel
ByteBuffer header = ByteBuffer.allocate(128); ByteBuffer body = ByteBuffer.allocate(1024); ByteBuffer[] bufferArray = { header, body }; channel.read(bufferArray); // read()方法按照buffer在数组中的顺序将从channel中读取的数据写入到buffer,当一个buffer被写满后,再写入另一个buffer中 channel.write(bufferArray); // write()方法会按照buffer在数组中的顺序,将数据写入到channel
以上是关于Java NIO的主要内容,如果未能解决你的问题,请参考以下文章