Java NIO三组件——Selecotr/Channel实现原理解析
Posted 兴趣使然的草帽路飞
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java NIO三组件——Selecotr/Channel实现原理解析相关的知识,希望对你有一定的参考价值。
目前很多高性能的Java RPC框架都是基于Netty实现的,而Netty的设计原理又离不开Java NIO。本篇笔记是对NIO核心三件套:缓冲区(Buffer)、选择器 (Selector)和通道(Channel),其中后两者选择器与通道实现原理的学习总结。
一、NIO聊天室入门案例
在学习原理之前,先来了解一个Java NIO实现聊天室的小案例,该案例只有三个类:Nioserver 聊天室服务端、NioClient 聊天室客户端、ClientThread 客户端线程。
服务端代码:
/**
* Nio聊天室服务端
*
* @author csp
* @date 2021-11-30 4:13 下午
*/
public class NioServer
/**
* 聊天室成员列表:
*/
Map<String, SocketChannel> memberChannels;
/**
* 端口
*/
private static final int PORT = 8080;
/**
* 选择器
*/
private Selector selector;
/**
* 管道
*/
private ServerSocketChannel server;
/**
* 缓冲
*/
private ByteBuffer buffer;
public NioServer() throws IOException
// 初始化Selector选择器
this.selector = Selector.open();
// 初始化Channel通道
this.server = getServerChannel(selector);
// 初始化Buffer缓冲:
this.buffer = ByteBuffer.allocate(1024);
// 初始化聊天室成员列表
memberChannels = new ConcurrentHashMap<>();
/**
* 初始化Channel通道
*
* @param selector
* @return
* @throws IOException
*/
private ServerSocketChannel getServerChannel(Selector selector) throws IOException
// 开辟一个Channel通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 通道设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 通道注册绑定Selector选择器,通道中数据的事件类型为OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 通道绑定端口
serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
return serverSocketChannel;
/**
* 事件监听
*/
public void listen() throws IOException
System.out.println("服务端启动......");
try
// 无限循环
while (true)
// 作用:至少需要有一个事件发生,否则(如果count == 0)就继续阻塞循环
int count = selector.select();
if (count == 0)
continue;
// 获取SelectorKey的集合
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> iterator = keySet.iterator();
while (iterator.hasNext())
// 当前事件对应的SelectorKey
SelectionKey key = iterator.next();
// 删除当前事件:表示当前事件已经被消费了
iterator.remove();
if (key.isAcceptable())
// Accept类型事件:
// 通过key获取ServerSocketChannel
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 通过ServerSocketChannel获取SocketChannel
SocketChannel channel = server.accept();
// channel设置为非阻塞模式
channel.configureBlocking(false);
// channel绑定选择器
channel.register(selector, SelectionKey.OP_READ);
// 从channel中获取Host、端口等信息
System.out.println("客户端连接:"
+ channel.socket().getInetAddress().getHostName() + ":"
+ channel.socket().getPort());
else if (key.isReadable())
// Read类型事件
SocketChannel channel = (SocketChannel) key.channel();
// 用于解密消息内容
CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
// 将消息数据从通道channel读取到缓冲buffer
//ByteBuffer buffer = ByteBuffer.allocate(50);
buffer.clear();
channel.read(buffer);
buffer.flip();
// 获取解密后的消息内容:
String msg = decoder.decode(buffer).toString();
if (!"".equals(msg))
System.out.println("收到:" + msg);
if (msg.startsWith("username="))
String username = msg.replaceAll("username=", "");
memberChannels.put(username, channel);
System.out.println("用户总数:" + memberChannels.size());
else
// 转发消息给客户端
String[] arr = msg.split(":");
if (arr.length == 3)
// 发送者
String from = arr[0];
// 接收者
String to = arr[1];
// 发送内容
String content = arr[2];
System.out.println(from + "发送给" + to + "的消息:" + content);
if (memberChannels.containsKey(to))
// 解密
CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
// 给接收者发送消息
memberChannels.get(to).write(encoder.encode(CharBuffer.wrap(from + ":" + content)));
catch (Exception e)
System.out.println("服务端启动失败......");
e.printStackTrace();
finally
try
// 先关闭选择器,在关闭通道
// 调用 close() 方法将会关闭Selector,同时也会将关联的SelectionKey失效,但不会关闭Channel。
selector.close();
server.close();
catch (IOException e)
e.printStackTrace();
public static void main(String[] args) throws IOException
// 服务端启动:
new NioServer().listen();
客户端线程类:
/**
* Nio聊天室客户端线程
*
* @author csp
* @date 2021-11-30 4:13 下午
*/
public class ClientThread extends Thread
/**
* 解密
*/
private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
/**
* 加密
*/
private CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
/**
* 选择器
*/
private Selector selector = null;
/**
* 通道
*/
private SocketChannel socket = null;
/**
* 通道key
*/
private SelectionKey clientKey = null;
/**
* 用户名
*/
private String username;
public ClientThread(String username)
try
// 创建一个Selector
selector = Selector.open();
// 创建Socket并注册
socket = SocketChannel.open();
socket.configureBlocking(false);
clientKey = socket.register(selector, SelectionKey.OP_CONNECT);
// 连接到远程地址
InetSocketAddress ip = new InetSocketAddress("localhost", 8080);
socket.connect(ip);
this.username = username;
catch (IOException e)
e.printStackTrace();
/**
* 开辟读取事件的线程
*/
@Override
public void run()
try
// 监听事件(无限循环)
while (true)
// 监听事件
selector.select();
// 事件来源列表
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext())
SelectionKey key = it.next();
// 删除当前事件
it.remove();
// 判断事件类型
if (key.isConnectable())
// 连接事件
SocketChannel channel = (SocketChannel) key.channel();
if (channel.isConnectionPending())
channel.finishConnect();
channel.register(selector, SelectionKey.OP_READ);
System.out.println("连接服务器端成功!");
// 发送用户名
send("username=" + this.username);
else if (key.isReadable())
// 读取数据事件
SocketChannel channel = (SocketChannel) key.channel();
// 读取数据
ByteBuffer buffer = ByteBuffer.allocate(50);
channel.read(buffer);
buffer.flip();
String msg = decoder.decode(buffer).toString();
System.out.println("收到:" + msg);
catch (IOException e)
e.printStackTrace();
finally
// 关闭
try
selector.close();
socket.close();
catch (IOException e)
/**
* 发送消息
*
* @param msg
*/
public void send(String msg)
try
SocketChannel client = (SocketChannel) clientKey.channel();
client.write(encoder.encode(CharBuffer.wrap(msg)));
catch (Exception e)
e.printStackTrace();
/**
* 关闭客户端
*/
public void close()
try
selector.close();
socket.close();
catch (IOException e)
客户端代码:
/**
* Nio聊天室客户端
*
* @author csp
* @date 2021-12-09 17:03:33
*/
public class NioClient
public static void main(String[] args)
// 当前客户端的用户名
String username = "lufei";
// 为当前客户端开辟一个线程
ClientThread client = new ClientThread(username);
client.start();
// 输入输出流
BufferedReader sin = new BufferedReader(new InputStreamReader(System.in));
try
// 循环读取键盘输入
String readline;
while ((readline = sin.readLine()) != null)
if (readline.equals("bye"))
client.close();
System.exit(0);
// 发送消息
client.send(username + ":" + readline);
catch (IOException e)
e.printStackTrace();
运行测试:
启动运行测试一下效果!
服务端先启动,控制台打印:
服务端启动......
接着启动客户端,控制台打印:
连接服务器端成功!
这时候服务端会打印客户端的连接信息以及用户名等信息:
测试客户端向服务的发送消息,客户端控制台输入Hello 我是lufei!
,这时候服务端会收到发送过来的消息内容:
我们可以再建立一个客户端启动类NioClient2,并将其启动,服务端会收到客户端2的消息:
让客户端1和客户端2之间发送消息:
服务端控制台打印:
这样,一个简单的聊天室就搭建成功了,如果小伙伴想自行完善,可以把代码拷贝一下,自己去设计自己想要实现的聊天室功能。
熟悉了NIO通信的小案例之后,我们通过一张图来分析一下其实现原理:
从图中可以看出,当有读或写等任何注册的事件发生时,可以从 Selector中获得相应的SelectionKey,同时从SelectionKey中可以找到发生的事件和该事件所发生的具体的SelectableChannel,以获得客户端发送过来的数据。
二、Selector 选择器
1、Selector 继承体系
NIO中实现非阻塞 I/O的核心对象是Selector,Selector是注册各种I/O事件的地方,而且当那些事件发生时,就是Seleetor告诉我们所发生的事件。
使用NIO中非阻塞I/O编写服务器处理程序,大体上可以分为下面三个步骤:
- (1) 向Selector对象注册感兴趣的事件。
- (2) 从Selector中获取感兴趣的事件。
- (3) 根据不同的事件进行相应的处理。
2、Selector 选择器的创建
在聊天室案例的NioServer服务端类中,选择器的初始化创建位于其构造函数中:
public NioServer() throws IOException
// 初始化Selector选择器
// 也可以通过实现java.nio.channels.spi.SelectorProvider.openSelector()抽象方法自定义一个Selector。
this.selector = Selector.open();
// 初始化Channel通道
// 初始化Buffer缓冲:
// 初始化聊天室成员列表
Selector
可以通过它自己的open()
方法创建,借助java.nio.channels.spi.SelectorProvider
类创建一个新的 Selector 选择器。也可以通过实现java.nio.channels.spi.SelectorProvider
类的抽象方法openSelector()
来自定义实现一个Selector。Selector 一旦创建将会一直处于 open 状态直到调用了close()
方法为止。
我们跟进这个open()
方法:
public abstract class Selector implements Closeable
protected Selector()
// 该方法返回一个Selector选择器:
public static Selector open() throws IOException
// 通过SelectorProvider构建选择器
return SelectorProvider.provider().openSelector();
...
继续向下跟进SelectorProvider.provider()
方法:
// 该方法位于SelectorProvider类中吗,可以获取能够构建Selector选择器的SelectorProvider对象
public static SelectorProvider provider()
synchronized (lock)
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>()
public SelectorProvider run()
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService()Tomcat NIO线程模型深入分析