BIO
伴随着Java的发布,带来的是Socket套接字API。这套API实现是的同步阻塞IO模型。下面首先来看个示例,如何使用这套API完成一个echo服务端程序。
服务端:
public class MultiThreadedEchoServer {
public static void main(String[] args) {
ServerSocket echoServer = null;
Executor executor = Executors.newFixedThreadPool(5);
int port = 9999;
int i = 0;
System.out.println("服务器在端口[" + port + "]等待客户请求......");
try {
echoServer = new ServerSocket(port);
while (true) {
Socket clientRequest = echoServer.accept();
executor.execute(new ThreadedServerHandler(clientRequest, i++));
}
} catch (IOException e) {
System.out.println(e);
}
}
}
public class ThreadedServerHandler implements Runnable {
Socket clientSocket = null;
int clientNo = 0;
ThreadedServerHandler(Socket socket, int i) {
if (socket != null) {
clientSocket = socket;
clientNo = i;
System.out.println("创建线程为[" + clientNo + "]号客户服务...");
}
}
@Override
public void run() {
PrintStream os = null;
BufferedReader in = null;
try {
in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
os = new PrintStream(clientSocket.getOutputStream());
String inputLine;
while ((inputLine = in.readLine()) != null) {
// 输入\'Quit\'退出
if (inputLine.equals("Quit")) {
System.out.println("关闭与客户端[" + clientNo + "]......" + clientNo);
os.close();
in.close();
clientSocket.close();
break;
} else {
System.out.println("来自客户端[" + clientNo + "]的输入: [" + inputLine + "]!");
os.println("来自服务器端的响应:" + inputLine);
}
}
} catch (IOException e) {
System.out.println("Stream closed");
}
}
}
客户端:
public class EchoClient {
public static void main(String[] args) {
Socket echoSocket = null;
PrintWriter out = null;
BufferedReader in = null;
try {
echoSocket = new Socket("127.0.0.1", 8080);
out = new PrintWriter(echoSocket.getOutputStream(), true);
in = new BufferedReader(new InputStreamReader(
echoSocket.getInputStream()));
System.out.println("连接到服务器......");
System.out.println("请输入消息[输入\\"Quit\\"]退出:");
BufferedReader stdIn = new BufferedReader(new InputStreamReader(
System.in));
String userInput;
while ((userInput = stdIn.readLine()) != null) {
out.println(userInput);
System.out.println(in.readLine());
if (userInput.equals("Quit")) {
System.out.println("关闭客户端......");
out.close();
in.close();
stdIn.close();
echoSocket.close();
System.exit(1);
}
System.out.println("请输入消息[输入\\"Quit\\"]退出:");
}
} catch (UnknownHostException e) {
System.err.println("Don\'t know about host: PallaviÕs MacBook Pro.");
System.exit(1);
} catch (IOException e) {
System.err.println("Couldn\'t get I/O for "
+ "the connection to: PallaviÕs MacBook Pro.");
System.exit(1);
}
}
}
在MultiThreadedEchoServer中创建了一个socket监听9999端口,接着在while循环中,通过echoServer.accept()
获取客户端的连接,
如果没有客户端连接,echoServer.accept()
会处理阻塞状态。
当获取到客户端连接时,就会从线程池中获取一个线程去处理客户端的连接。在该模型中,
- 服务端主线程负责接收客户端连接,并且生成的客户端链接投递到线程池中
- 线程池中的线程负责执行对客户端链接的数据读写业务。
BIO 时代,比较有名的产物就是 Tomcat 了。其底层的连接模型就是上面我们介绍的这种模式。
Tomcta中BIO的处理模式如下图所示:
NIO
为了解决高并发的问题,java1.4之后新增了NIO模型,该模型基于多路复用选择器监测连接状态在通知线程处理,从而达到非阻塞目的。比传统BIO能更好的支持高并发场景。首先来看下示例代码,仍然以上述的 echo 服务器为例,在 NIO 中可以如下实现:
服务端:
public class NIOEchoServer implements Runnable {
private Selector selector;
private ServerSocketChannel servChannel;
private volatile boolean stop;
private int num = 0;
public NIOEchoServer(int port) {
try {
selector = Selector.open();
servChannel = ServerSocketChannel.open();
servChannel.configureBlocking(false);
servChannel.socket().bind(new InetSocketAddress(port), 1024);
servChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器在端口[" + port + "]等待客户请求......");
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
this.stop = true;
}
@Override
public void run() {
while (!stop) {
try {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
// 处理新接入的请求消息
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = ssc.accept(); // Non blocking, never null
socketChannel.configureBlocking(false);
SelectionKey sk = socketChannel.register(selector, SelectionKey.OP_READ);
sk.attach(num++);
}
if (key.isReadable()) {
// 读取数据
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("来自客户端[" + key.attachment() + "]的输入: [" + body.trim() + "]!");
if (body.trim().equals("Quit")) {
System.out.println("关闭与客户端[" + key.attachment() + "]......");
key.cancel();
sc.close();
} else {
String response = "来自服务器端的响应:" + body;
doWrite(sc, response);
}
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else {
}
}
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if (selector != null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void doWrite(SocketChannel channel, String response) throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
public static void main(String[] args) {
NIOEchoServer timeServer = new NIOEchoServer(9999);
new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
}
}
Java NIO 主要由下面3部分核心组件组成:
- Buffer
- Channel
- Selector
来分析下这个程序,先来看第一段代码:
selector = Selector.open();
servChannel = ServerSocketChannel.open();
servChannel.configureBlocking(false);
servChannel.socket().bind(new InetSocketAddress(port), 1024);
servChannel.register(selector, SelectionKey.OP_ACCEPT);
首先是创建了选择器实例java.nio.channels.Selector以及
服务端的 Socket 通道对象,也就是java.nio.channels.ServerSocketChannel。选择器是 Java 实现 IO 复用的核心组件。将服务端通道对象注册到选择器上,并且传入该通道关注的选择事件。服务端通道关注的是客户端通道的接入事件,也就是accept。到这里,准备工作就全部完成了。
之后是在一个while循环中,当stop未停止时,会一直处于while循环体中。接下来是一个阻塞等待的过程,
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
线程阻塞在java.nio.channels.Selector#select()调用上,等待有感兴趣的事件发生。由于一开始只注册了服务端通道的accept关注事件,因此此时能触发的只有客户端的接入。当有客户端接入后,select方法就从阻塞中返回。此时调用java.nio.channels.Selector#selectedKeys方法获的从select调用后产生的选择键合集。
选择键是一个标识,用于代表一个通道注册到了一个选择器上。因此选择器会包含三个重要属性:
- 选择器对象
- 通道对象
- 通道的关注事件标识
遍历合集,取出每一个选择键。判断选择键关注的事件类型来决定不同的处理策略。
while (it.hasNext()) {
key = it.next();
it.remove();
try {
// 处理新接入的请求消息
if (key.isAcceptable()) {
....
}
if (key.isReadable()) {
// 读取数据
....
}
} catch (Exception e) {
//通道连接关闭,可以取消这个注册键,后续不在触发。
}
}
我们重点看读取数据的操作:
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
nio中另一个组件,就是buffer,当服务端进行数据读写的时候,会将通道里的数据读取到缓冲区中,在进行后续的操作。因此nio也是面向缓冲区的IO模型。
通过while循环,仅仅使用一个选择器就可以处理多个通道上的任务。并且由于在获得选择键后,剩余的操作都可以较快的完成(从缓存区中读取数据和写入相比于内核等待数据的时间来说是固定可预测的),因此一个选择器就可以处理大量的通道事件,不会因为一个通道上的数据处理而大幅度延迟其他通道。
由BIO进化为的NIO: