Java核心技术读书笔记11-6 Java AIO(NIO 2.0)
Posted 芝芝与梅梅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java核心技术读书笔记11-6 Java AIO(NIO 2.0)相关的知识,希望对你有一定的参考价值。
1.什么是Java AIO?
在Java SE 7 API中NIO相关库得到了扩充,新的NIO库被称为NIO 2.0,在这一版中Java提供了异步IO的能力,包括异步处理文件以及异步访问Socket数据。
异步的IO操作是IO模型的一种(关于IO模型,可以参考:Java NIO预备知识:I/O底层原理与网络I/O模型),它采用的是一种“订阅-通知”模式。即应用程序线程向操作系统注册IO监听,当应用程序线程发出IO请求后便可以不在关心IO处理,内核进行系统调用,当数据准备好时进行接收数据,接收完毕后还会自行将数据写入用户地址空间。写入完毕之后由内核主动通知应用程序。因此,AIO是异步非阻塞的。
对于Java的AIO来说,数据的读写操作直接调用API的read或write方法即可。
对于读操作来说,数据源中有数据可读,内核会将数据读入read方法的缓冲区(用户空间)然后通知应用程序。对于写操作来说,内核会将write方法传递到数据源,传递完毕后通知应用程序。两个方法都是异步的,这样的异步操作不是由发起I/O请求的线程来处理,而是通过线程池分配一个新的线程来进行处理。最后,对于异步操作的结果处理分为两种:返回一个Future对象来跟踪异步任务或注册时使用CompletionHandler在异步I/O完成后执行回调函数。
与第一版NIO的四种通道类似,Java SE 7为Java.nio.channels包新增了四种异步通道:
AsynchronousFileChannel
AsynchronousServerSocketChannel
AsynchronousSocketChannel
AsynchronousDatagramChannel
分别对应文件、TCP请求、TCP连接以及UDP的数据传输。
2.异步执行流程与异步通道组
2.1 异步执行的详细流程
1.首先,Java的异步任务分为两种:进行异步的IO操作与为IO结果执行回调函数。
2.Java是通过异步通道组中的线程池为异步任务分配线程,这个线程组可以在创建四种异步通道时指定,如果不指定则使用系统默认的组。
3.在用户线程执行任务时,会将IO请求提交给操作系统内核,如果存在CompletionHandler则一同注册到内核,然后立即返回继续执行线程的其余代码。
4.内核会使用系统调用完成相应的IO操作,例如读操作为:从数据源(文件、Socket)读数据写入用户线程的缓冲区。
5.IO完毕之后内核指定操作系统合适的Proactor(完成分发器——completion dispatcher)完成分发任务,即执行CompletionHandler指定的回调函数。
若不在创建通道时指定组,则会使用Java虚拟机维护的一个自动构造的系统范围的默认组,默认组有一个关联的线程池,可以根据需要创建新线程。如果没有配置默认组的ThreadFactory,则该组的线程池中的线程为守护线程。这意味着,如果你在主线程中构建了一个使用未配置的默认组的通道,有可能主线程会在异步任务执行完毕前退出,而此时守护线程也将因为无其他线程运行而退出,该任务将立即终止。
2.2 异步通道组
异步通道组关联一个线程池,在创建时指定。随后可以将该组作为参数传入一个异步通道。使用一个组的通道共享该组的线程池。
// ExecutorService executorService = Executors.newSingleThreadExecutor(); //一个单线程线程池
// AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executorService);
// AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 10);//缓存线程池
AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(10, Executors.defaultThreadFactory());//固定线程池
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group); //建立通道并指定通道组
3.异步读写文件
和同步的NIO类似,AIO也需要使用buffer来读取数据。异步读写文件的方式分两种分别是使用Future对象和
3.1 使用Future对象接收异步任务
读数据
首先,需要使用AsynchronousFileChannel类的静态方法open来返回一个AsynchronousFileChannel对象,接下来是将管道连接至一个缓冲区并将数据读到缓冲区内。
与同步读写File的NIO不同,该方法不会返回读入了多少字节,而是会立即返回一个Future对象,该对象即为在多线程编程中遇到过的java.util.concurrent.Future,该类可以处理异步任务,并且可以使用isDone实例方法检查任务是否完毕以及get方法获取结果。下面为示例代码:
AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("file/Test.txt"), StandardOpenOption.WRITE, StandardOpenOption.READ); //获得一个异步文件管道
ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> future = channel.read(buffer, 0); //读入数据到缓冲区,第二个参数为下标,指示文件的开始读取位置
while(!future.isDone()); //如果异步任务未完成则进行阻塞
buffer.flip();
FirstNIO.printBufByte(buffer, "AIOTestOne"); //一个自定义的打印缓冲区结果方法
读数据
写操作类似,写这个过程也是一个任务,同样返回Future对象并根据这个对象判断任务执行的状态。
buffer.put("abcdef".getBytes());
buffer.flip();
Future<Integer> future = channel.write(buffer, channel.size()); //从缓冲区写出数据到文件,第二个参数为下标,指示文件的开始写出位置
while (!future.isDone()) ; //如果异步任务未完成则进行阻塞
System.out.println("写入完成");
3.2 使用CompletionHandler
读数据
第二种方式是使用CompletionHandler,需要构建一个CompletionHandler的实现类对象,然后将该对象作为参数传入read方法。CompletionHandler接口的两个抽象方法分别是completed与failed,这两个方法承担了回调方法的作用,在read完毕后如果成功会调用completed,失败则会调用failed方法。因此可以在实现接口时重写的completed方法内部编写接收数据的处理逻辑。
使用CompletionHandler后,本线程发起一个I/O请求,调用read方法将这个请求提交给内核,内核处理完毕之后JVM会再使用一个新的线程,然后会由这个新线程使用回调方法。
AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("file/Test.txt"), StandardOpenOption.WRITE, StandardOpenOption.READ); //获得一个异步文件管道
ByteBuffer buffer = ByteBuffer.allocate(1024);
/**
* 第一个参数buffer会接收数据,第二个参数指定文件起始操作位置,第三个参数attachment也可以接收数据并可传递给completed方法
*/
channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) { //第一个参数为读取字节数,第二个参数为attachment接收数据
System.out.println(Thread.currentThread().getName()); //一个新线程
System.out.println("读入了" + result + "个字节");
attachment.flip();
FirstNIO.printBufByte(attachment, "任务自动处理");
attachment.clear();
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("读入数据失败");
}
});
Thread.sleep(10); //防止本线程结束时IO线程还未创建而无法接受数据。
写数据
写出数据类似,可以在completed方法中查看写出的字节数与写出数据。
AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("file/Test.txt"), StandardOpenOption.WRITE, StandardOpenOption.READ); //获得一个异步文件管道
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("\\nnewStr".getBytes());
buffer.flip();
/**
* 第一个参数buffer会接收数据,第二个参数指定文件起始操作位置,第三个参数attachment也可以查看写出数据并可传递给completed方法
* 下述IO操作都由一个新线程执行
*/
channel.write(buffer, channel.size(), buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) { //第一个参数为写出字节数,第二个参数为attachment写出数据
System.out.println(Thread.currentThread().getName()); //一个新线程
System.out.println("写出了" + result + "个字节"); //写出字节数量
attachment.flip(); //可以查看写出的字节
FirstNIO.printBufByte(attachment, "任务自动处理");
attachment.clear();
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("写出数据失败");
}
});
Thread.sleep(100); //防止本线程结束时IO线程还未创建而无法写出数据。
4.异步建立TCP连接与读写Socket数据
异步建立TCP连接与读写Socket数据与使用多路复用NIO的步骤非常相似,使用的分别是AsynchronousServerSocketChannel与AsynchronousSocketChannel,前者监听端口并通过accept方法获取TCP连接,并返回连接所用Socket。后者的一端连接有TCP所用的Socket用以传输数据。
4.1 使用Future对象接收异步任务
使用Future对象接收异步任务,对于异步Channel的建立连接、读、写都会返回Future对象,如果需要确保I/O确实完成了,需要该对象的isDone方法返回true。下面为对应的客户端与服务器端的代码。
//客户端
public class AIOClient {
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
InetSocketAddress address = new InetSocketAddress("localhost", 8189);
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
Future<Void> connectFuture = channel.connect(address);
while(!connectFuture.isDone()); //等待直到建立连接
Future<Integer> writeFuture;
Future<Integer> readFuture;
if(connectFuture.get() == null){
System.out.println("已成功连接服务器");
readFuture = channel.read(buffer);
while (!readFuture.isDone());
while (true){
buffer.flip();
FirstNIO.printBufByte(buffer, "服务器端");
Scanner scanner = new Scanner(System.in);
String message = "";
message = scanner.nextLine();
buffer.put(message.getBytes());
buffer.flip();
writeFuture = channel.write(buffer);
while(!writeFuture.isDone());
buffer.clear();
readFuture = channel.read(buffer);
while (!readFuture.isDone());
}
}else{
System.out.println("无法建立连接");
}
}
}
//服务器端
public class AIOServer {
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(8189);
server.bind(address);
System.out.println("服务器启动成功,正在等待连接...");
while (true){
Future<AsynchronousSocketChannel> future = server.accept();
while(!future.isDone());
System.out.println("建立新连接成功");
Future<Integer> writeFuture;
Future<Integer> readFuture;
AsynchronousSocketChannel channel = future.get();
buffer.put("Hello, here is server, welcome!".getBytes());
buffer.flip();
writeFuture = channel.write(buffer);
while(!writeFuture.isDone());
buffer.clear();
readFuture = channel.read(buffer);
while (!readFuture.isDone());
while (true){
buffer.flip();
FirstNIO.printBufByte(buffer, "客户端");
buffer.put("message received".getBytes());
buffer.flip();
writeFuture = channel.write(buffer);
while(!writeFuture.isDone());
buffer.clear();
readFuture = channel.read(buffer);
while (!readFuture.isDone());
}
}
}
}
4.2 使用CompletionHandler
与读写文件类似,异步的ServerSocketChannel与SocketChannel的连接、读、写方法都有指定CompletionHandler对象的版本,在操作成功时会使用completed方法处理,否则会交给failed方法处理。
需要注意的是,与异步FileChannel不同,网络异步操作的连接、读写的操作可能是不断的,所以为了处理不断的操作,应该在completed与failed方法内再调用一次相关的方法以便继续处理下次操纵。
下面是将服务器端的连接操作改造为CompletionHandler版本的示例代码,注意server.accept(null, this);
public class AIOServer2 {
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(8193);
server.bind(address);
System.out.println("服务器启动成功,正在等待连接...");
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
server.accept(null, this); //处理接续进入的连接
System.out.println("建立新连接成功");
Future<Integer> writeFuture;
Future<Integer> readFuture;
AsynchronousSocketChannel channel = result;
buffer.put("Hello, here is server, welcome!".getBytes());
buffer.flip();
writeFuture = channel.write(buffer);
while (!writeFuture.isDone()) ;
buffer.clear();
readFuture = channel.read(buffer);
while (!readFuture.isDone()) ;
while (true) {
buffer.flip();
FirstNIO.printBufByte(buffer, "客户端");
buffer.put("message received".getBytes());
buffer.flip();
writeFuture = channel.write(buffer);
while (!writeFuture.isDone()) ;
buffer.clear();
readFuture = channel.read(buffer);
while (!readFuture.isDone()) ;
}
}
@Override
public void failed(Throwable exc, Object attachment) {
server.accept(null, this); //处理接续进入的连接
System.out.println("建立连接失败");
System.out.println(exc.getMessage());
}
});
while (true){
Thread.sleep(5000);
}
}
}
参考:
AIO 详解
17. Java NIO AsynchronousFileChannel异步文件通道
Proactor模式详解
以上是关于Java核心技术读书笔记11-6 Java AIO(NIO 2.0)的主要内容,如果未能解决你的问题,请参考以下文章