Java BIO,NIO,AIO
Posted Cuzzz
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java BIO,NIO,AIO相关的知识,希望对你有一定的参考价值。
一丶IO模型&Java IO
Unix为程序员提供了以下5种基本的io模型:
- blocking io: 阻塞io
- nonblocking io: 非阻塞io
- I/O multiplexing: io多路复用
- signal driven I/O:信号驱动io
- asynchronous I/O:异步io
但我们平时工作中说的最多是,阻塞
,非阻塞
,同步
,异步
1.阻塞非阻塞,同步异步
-
阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
可用把上面的阻塞队列看作是外卖柜
put
方法就是外卖员放外卖,如果容量不够那么一直等待其他用户拿走外卖,这是阻塞。offer
方法也是外卖员放外卖,但是他发现容量不够的时候,返回false,然后采取其他行动,比如打电话喊你下来拿外卖。 -
同步与异步关注的是消息通信机制。同步是发起调用在没有得到结果之前,该调用就不返回。异步是发起调用后,这个调用就直接返回了。
消息队列中间件的作用之一就是异步,发送方将消息发送就立马返回了,不需要等待这个消息被消费者处理。
同步就是你打电话问外卖员外卖到哪里了,外卖员告知你之前你不挂断电话。
异步就是你外卖app上发消息问外卖员,发完消息你立马可用做其他的事情。
异步情况下你怎么知道外卖到哪里了昵?
-
通知
外卖员通过平台回复你
-
回调
你给外卖员注册了一个回调事件——收到消息后,请回电告知,然后你调用结束,继续处理你的事情,但是外卖员收到消息后,会回调进行电话。
-
2.Unix的io模型
io操作分为两步:
-
等待数据就绪
例如读文件的过程中需要等待磁盘扫描所需数据,等待数据到达内核缓冲区
-
将数据从内核空间拷贝到用户空间
对于一次读取IO的操作,数据并不会直接拷贝到应用程序的缓冲区(用户空间),它首先会被拷贝到操作系统内核的缓冲区(内核空间)中,然后才会从操作系统内核的缓冲区拷贝到应用程序的缓冲区。
2.1 blocking io阻塞io
首先是我们用户进行进行系统调用,产生中断,操作系统切换到内核态,随后是内核完成数据准备和数据从内核空间复制到用户空间,然后应用进程继续运行。
这里说的阻塞,是系统调用不会立即返回,而是需要阻塞知道数据准备完成,并拷贝到用户空间。
2.2 nonblocking io 非阻塞io
可看到,和阻塞io的区别在于,准备数据的这个过程,是应用程序不断进行系统调用,询问操作系统内核是否完成了数据准备,此系统调用不会阻塞直到数据准备完成,而是立马返回。
但是第二阶段,数据从内核空间复制到用户空间是阻塞的,这个过程通常是比较快速的,因为这时候已经有DMA控制器完成了数据从磁盘搬运到内存,只需要拷贝到用户态空间中即可。
2.3 I/O multiplexing io多路复用
可以看到IO多路复用的流程和 blocking io阻塞io
类似,甚至还会多一次系统调用。那么IO多路复用存在的意义是什么昵?
假设我们当前的进程是一个服务端程序,存在多个网络io需要处理,我们需要多个线程取处理多个网络io,并且多个线程都是阻塞在系统调用上的,这是对线程资源的浪费。
io多路复用的优点就是:可以使用一个线程监听多路io,这个线程阻塞与select
系统调用上,当多路io存在任何一个io可读的时候,线程将被唤醒,然后进行数据的拷贝,并进行处理,从而节省线程资源。
2.4 signal driven I/O信号驱动io
可以看到,信号驱动的io在数据准备阶段是非阻塞的,当操作系统完成数据准备后将发送信号来通知用户进程发生了某事件,用户进程需要编写对应的信号处理函数,在信号处理函数中阻塞与内核数据拷贝,待拷贝完成后对数据进行处理。
2.5 asynchronous I/O 异步io
上面四种模型都会在数据从内核空间,拷贝到用户空间
这一步发生阻塞,也就是说至少第二步是需要同步等待操作系统完成拷贝的。
异步io模型则解决了这个问题,应用程序只要通知内核要读取的套接字对象, 以及数据的接收地址, 则整个过程都是由内核独立来完成, 包括数据从内核空间向用户空间的拷贝,拷贝完成后再通过信号来通知用户进程。
2.java中的io模型
将阻塞
,非阻塞
,同步
,异步
进行组合
-
阻塞同步io
这就是java中的BIO
-
非阻塞同步io
这就是java中的NIO,java中的nio是通过io多路复用实现的
-
非阻塞异步io
这就是java中的AIO,java中的AIO也是通过io多路复用实现,呈现出异步的表象
二丶Java BIO
下面探讨下java中BIO实现Socket编程方面的不足
public static void main(String[] args) throws IOException
ExecutorService threadPool
= new ThreadPoolExecutor(10,10,100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100));
// 1 创建一个socket server监听tcp 1111端口
ServerSocket serverSocket = new ServerSocket(1111);
// 2 阻塞式接受来自客户端的连接
while (true)
//这一步是阻塞的 阻塞直到有客户端连接上来
Socket socket = serverSocket.accept();
System.out.println(socket.getRemoteSocketAddress() + "连接到服务端");
// 3 为了不影响后续连接进来处理,使用多线程来处理连接
threadPool.execute(() -> process(socket));
private static void process(Socket socket)
try (OutputStream out = socket.getOutputStream())
byte[] buffer = new byte[1024];
int len;
while ((len = socket.getInputStream().read(buffer)) > 0)
System.out.println(socket.getRemoteSocketAddress() + "发送数据:" + new String(buffer, 0, len));
out.write(buffer, 0, len);
catch (Exception e)
e.printStackTrace();
上面代码实现了,如果客户端请求过来讲客户端请求原封不动的写回,可以看到为了实现实现服务端支持多个客户端的连接,我们使用的线程池。
首先 Socket socket = serverSocket.accept()
,这一步会阻塞直到有客户端连接上来(这一步无所谓,甚至避免了主线程无休止的自旋)
其次process
方法中拿到输入输出流写回的操作也是阻塞的,这一步需要使用操作系统提供的系统调用,将数据从网卡或者硬盘读入内核空间,然后从内核空间拷贝到用户空间,我们的java程序才可以进行读操作,写则反之。
由于read,write
这两个方法是阻塞式的,它需要阻塞直到系统调用完成,我们的程序傻傻阻塞等待,因此我们使用了线程池,希望一个线程处理一个客户端请求,阻塞也只阻塞线程池中的线程。但是process
方法中阻塞的这部分,会体现在我们线程池的线程,也就是说,线程池中存在一些线程阻塞于read
,write
函数。
这种模型的优点:
- 简单直接,可以让开发人员专注于编写process的业务逻辑
- 不用过多考虑系统的过载、限流等问题。线程池本身就是一个天然的漏斗,可以缓冲一些系统处理不了的连接或请求。
- 使用多线程利用多核心cpu的能力,当线程阻塞的时候,cpu可以切换时间片给其他线程
这种模型的缺点:
- 非常依赖于线程,线程是宝贵的资源,虽然使用线程池进行了复用,当前当大量请求到来的时候,我们无法无限制的开辟线程。众多的线程被挂起,被唤醒还会导致上下文切换频繁,cpu利用率降低
- 线程本身占用较大内存,过多的线程导致jvm内存岌岌可危
那么怎么解决上述的问题昵,能不能解放线程不让他们阻塞在read和write中,能读那就读,不能读那就继续处理其他socket?
三丶Java NIO
回顾这张图,我们上面说的解放线程不让他们阻塞在read和write中,能读那就读,不能读那就继续处理其他socket
,不正是上面非阻塞的方式,希望系统调用可以立即返回,而不是阻塞。
Java中的nio基于io多路复用实现了同步非阻塞的处理方式
public static void main(String[] args) throws IOException, InterruptedException
// 1 创建selector用来侦听多路IO消息 \'文件描述符\'
// selector 担任了重要的通知角色,可以将任意IO注册到selector上,通过非阻塞轮巡selector来得知哪些路IO有消息了
// 底层是epoll(linux下)
// 后续会把server端注册上来,有服务端被客户端连接上来的IO消息
// 也会把每个客户端连接注册上来,有客户端发送过来的数据
Selector selector = Selector.open();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 2 把server端注册上去
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 1111));
//配置为非阻塞
serverSocketChannel.configureBlocking(false);
//关心accept事件,
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true)
// 3 这一步是阻塞的,基于io多路复用中的select poll,epoll
// 这里可以设置等待事件
if (selector.select() == 0)
continue;
// 4 如果有至少一路IO有消息,那么set不为空
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey key : selectionKeys)
if (key.isAcceptable())
System.out.println("客户端连接");
// 因为我们只注册了serverSocketChannel这一个可以accept的所以这里用强转即可
SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
socketChannel.configureBlocking(false);
// 5 当第一次客户端连接时,就将这个连接也作为channel注册上,他是可读型的
//当前只是有客户端连接上来了,但是并不代表可读,还需要DMA将网卡数据搬运到内存
socketChannel.register(selector, SelectionKey.OP_READ);
else if (key.isReadable())
// 6 因为步骤5把客户端连接也注册上来了,并且是可读上面的数据的,如果该channel被选出来说明有客户端数据来了
SocketChannel socketChannel = (SocketChannel) key.channel();
// 7 必须借助ByteBuffer接受和发送数据
byteBuffer.clear();
if (socketChannel.read(byteBuffer) <= 0)
continue;
byteBuffer.flip();
byte[] b = new byte[byteBuffer.limit()];
byteBuffer.get(b);
System.out.println(key + " 数据来了: " + new String(b));
byteBuffer.clear();
byteBuffer.put(b);
byteBuffer.flip();
socketChannel.write(byteBuffer);
// 8 非常重要一定要清理掉每个channel的key,来表示已经处理过了,不然下一次还会被select
selectionKeys.clear();
select是阻塞的,无论是通过操作系统的通知(epoll)还是不停的轮询(select,poll),这个函数是阻塞的,它还支持超时阻塞模式。这是一个线程监听多路io的体现,只要有一个事件就绪那么select就会返回。
socketChannel.configureBlocking(false)
将 socketChannel设置为非阻塞其读写操作都是非阻塞的,也就说如果无法读,那么read函数返回-1,将会让当前线程去遍历其他就绪的事件,而不是傻傻等待,这是非阻塞io的体现
。
四丶Java AIO
public static void main(String[] args) throws IOException, InterruptedException
AsynchronousServerSocketChannel serverChannel =
AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(1111));
System.out.println(Thread.currentThread() + "开始监听1111端口");
serverChannel.accept(null, new CompletionHandler<>()
@SneakyThrows
@Override
public void completed(AsynchronousSocketChannel channel, Object attachment)
// 递归注册accept
serverChannel.accept(attachment, this);
System.out.println(Thread.currentThread() + "有客户端连接上来了" + channel.getRemoteAddress());
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer, null, new CompletionHandler<Integer, ByteBuffer>()
@SneakyThrows
@Override
public void completed(Integer len, ByteBuffer attachment)
// 递归注册read
channel.read(buffer, null, this);
buffer.flip();
System.out.println(channel.getRemoteAddress() + ":" + new String(buffer.array(), 0, len));
buffer.clear();
channel.write(ByteBuffer.wrap("HelloClient".getBytes()));
@Override
public void failed(Throwable exc, ByteBuffer attachment)
);
@Override
public void failed(Throwable exc, Object attachment)
);
Thread.sleep(Integer.MAX_VALUE);
在AIO
中,所有创建的通道都会直接在OS
上注册监听,当出现IO
请求时,会先由操作系统接收、准备、拷贝好数据,然后再通知监听对应通道的程序处理数据。
客户端的连接到来后同样会先注册到选择器上,但客户端的I/O
请求会先交由OS
处理,当内核将数据拷贝完成后才会分配一条线程处理。这一点不同于BIO和NIO,NIO和BIO在内核拷贝数据到用户态的这一步任然是阻塞的。
BIO,NIO,AIO详解
Java中的IO,BIO,NIO,AIO详解
IO
BIO,NIO和AIO的关系
- BIO:
java.io
包.基于流模型实现,使用同步,阻塞方式.即:读输入流或写输出流时,在读或写动作完成之前,读(写)线程一直阻塞.性能差. - NIO:
java.nio
包.可以构建多路复用,同步非阻塞的IO操作. - AIO:Java 1.7之后引入的包.提供异步非阻塞的IO操作.基于事件和回调机制实现的.程序操作后直接返回,不阻塞,后台完成处理后,由系统通知相应的线程进行后续的操作.
IO的分类
InputStream
,OutputStream
:基于字节操作的IO.Writer
,Reader
:基于字符操作的IO.File
:基于磁盘操作的IO.Socket
:基于网络操作的IO.
IO的使用
InputStream
的使用
InputStream inputStream = new FileInputStream("src\IOBIONIOAIO.md");
byte[] bytes = new byte[inputStream.available()];
inputStream.read(bytes);
String str = new String(bytes);
inputStream.close();
System.out.println(str);
OutputStream
的使用
OutputStream outputStream = null;
outputStream = new FileOutputStream("src\IOBIONIOAIO.md",true);
outputStream.write("
OutputStream测试".getBytes("utf-8"));
outputStream.close();
Writer
的使用
Writer writer = new FileWriter("src\IOBIONIOAIO.md",true);
writer.append("
使用Writer向文件写入数据");
writer.close();
Reader
的使用
Reader reader = new FileReader("src\IOBIONIOAIO.md");
BufferedReader br = new BufferedReader(reader);
StringBuffer sb = new StringBuffer();
String str = null;
while ((str = br.readLine()) != null){
sb.append(str+"
");
}
br.close();
reader.close();
System.out.println(sb.toString());
同步,异步,阻塞,非阻塞
同步与异步
同步
- 一个任务的完成依赖于另外一个任务.
- 只有被依赖的任务完成后,依赖的任务才算完成.
异步
- 依赖的任务通知被依赖的任务要完成的任务后,完成自身的任务即算作完成.
- 被依赖的任务是否真正完成,依赖的任务无法确定.
阻塞和非阻塞
阻塞
调用结果返回之前,当前线程会被挂起.调用线程只有在得到结果之后才返回.
非阻塞
在没有得到结果之前,不会阻塞当前线程.
线程可以在结果返回之前完成其他任务,提高了CPU的利用率.
问题:系统的线程切换增加,需要权衡增加的CPU使用时间和线程切换的成本.
同/异,阻/非阻塞的组合
组合方式 | 性能 |
---|---|
同步阻塞 | I/O性能差,CPU大部分在空闲状态 |
同步非阻塞 | 提升I/O性能,增加CPU消耗 |
异步阻塞 | 对I/O能够提升效率 |
异步非阻塞 | 集群之间的消息同步机制 |
文件读写
// 使用FileWriter向文件写入数据
public void fileWriterTest() throws IOException {
FileWriter fw = new FileWriter("src\IOBIONIOAIO.md",true);
fw.write("
来自FileWriter写入的内容");
fw.close();
}
// 使用FileReader读取文件的内容
public void fileReaderTest() throws IOException {
FileReader fr = new FileReader("src\IOBIONIOAIO.md");
BufferedReader br = new BufferedReader(fr);
StringBuffer sb = new StringBuffer();
String str;
while ((str = br.readLine())!=null){
sb.append(str+"
");
}
br.close();
fr.close();
System.out.println(sb.toString());
}
使用java.nio
包中的Files
实现对文件的读写:
public void filesWrite() throws IOException {
Files.write(Paths.get("src\IOBIONIOAIO.md"),"
使用Files向文件写入数据".getBytes(), StandardOpenOption.APPEND);
}
public void filesRead() throws IOException {
byte[] bytes = Files.readAllBytes(Paths.get("src\IOBIONIOAIO.md"));
System.out.println(new String(bytes).toString());
}
Socket和NIO的多路复用
传统的Socket实现
服务器只给客户端发消息.
客户端将接收到的消息打印.
/**
* 服务器端:
* 调用accept()阻塞线程等待客户端连接.
* 客户端连接后将数据传输过去
*/
public void servers(){
Thread thread = new Thread(() -> {
try {
ServerSocket serverSocket = new ServerSocket(port);
while (true) {
Socket socket = serverSocket.accept();
try (PrintWriter printWriter = new PrintWriter(socket.getOutputStream())) {
printWriter.println("来自于服务器端的消息");
printWriter.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
});
thread.start();
}
/**
* 客户端:
* 打开指定的socket端口,
* 使用BufferedReader读取端口传来的数据
*/
public void client(){
try (Socket socket = new Socket(InetAddress.getLocalHost(),port)){
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
br.lines().forEach(s-> System.out.println(s));
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
示意图:
NIO多路复用
public void servers(){
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 4, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
threadPool.execute(()->{
try (Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()){
serverSocketChannel.bind(new InetSocketAddress(InetAddress.getLocalHost(),port));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true){
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
try(SocketChannel channel = ((ServerSocketChannel)key.channel()).accept()){
channel.write(Charset.defaultCharset().encode("来自服务器通过NIO写入数据"));
}
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
public void client(){
try (Socket cSocket = new Socket(InetAddress.getLocalHost(),port)){
BufferedReader br = new BufferedReader(new InputStreamReader(cSocket.getInputStream()));
br.lines().forEach(s-> System.out.println(s));
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
流程:
- 通过
Selector.open()
建立一个Selector
,充当调度员. - 创建一个
ServerSocketChannel
,并向Selector
注册,通过指定的SelectionKey.OP_ACCEPT
告诉调度员,关注的是新的连接请求. - 配置非阻塞模式,否则注册行为抛出异常.
Selector
阻塞在select
操作,当有Channel
发生接入请求时,就会被唤醒.
示意图:
参考:
以上是关于Java BIO,NIO,AIO的主要内容,如果未能解决你的问题,请参考以下文章