Java NIO 篇
Posted Dream_it_possible!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java NIO 篇相关的知识,希望对你有一定的参考价值。
1. 什么是Java NIO?
1) Java 的NIO是基于 多路I/O 实现的,在多路I/O 复用的模型中有一个selector 的线程不断轮询并监听多个socket的状态,只有在socket有读写的时候,才会通知用户线程进行I/O 读写操作。
2) NIO Server 通过selector.select() 来获取查看通道里是否有时间到达,如果没有,那么用户线程一直会处于阻塞状态,多路I/O 复用只用了一个selector线程就能监听和管理多个socket 通道,因此I/O多路复用在连接数众多的时候有很大的优势。
优点: 当响应体很小的时候,能够节约很多管理资源。
缺点: 当响应题数据很大的时候,如果我们通过一个selector去轮询处理事件,那么一个线程的selector会成为性能的瓶颈。 在实际开发中,在多路复用的方法体内不做复杂的逻辑运算,一般只做接收信息和转发的作用,实际逻辑处理转交给后续的业务线程去处理。
3) 客户端与服务端的交互是通过 buffer 和channel 实现的, 读写都是先通过buffer然后再进入到channel里进行数据传输。
4) NIO三大组件
buffer: 缓冲区。
channel: 通道。
selector: 选择器,监听socketChannel, 一个socket对应一个channel, 一个selector可以监听并管理多个socket。
5) NIO 数据流向原理
理解了NIO的原理后,可以尝试写一个简单的NIO server和NIO client。
2. 手写一个NIO框架
2.1 手写NIO Server
package com.nio.demo.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
/**
* @author bingbing
* @date 2021/5/13 0013 18:55
*/
public class MyServer {
private int size = 1024;
private ServerSocketChannel serverSocketChannel;
private ByteBuffer byteBuffer;
private Selector selector;
private int remoteClientNum = 0;
private MyServer(int port) {
try {
initChannel(port);
} catch (Exception e) {
e.printStackTrace();
}
}
private void initChannel(int port) throws IOException {
// 1. 打开信道 Channel
serverSocketChannel = ServerSocketChannel.open();
// 2. 设置为非阻塞模式。
serverSocketChannel.configureBlocking(false);
// 3. 绑定端口
serverSocketChannel.bind(new InetSocketAddress(port));
// 4. 创建选择器
selector = Selector.open();
// 5. 向选择器注册通道。
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6. 分配缓存区的大小
byteBuffer = ByteBuffer.allocate(size);
}
private void listen() throws Exception {
// 设置监听器,用于监听channel里的数据变化
System.out.println("开始监听请求...");
while (true) {
// n 表示有多少个channel准备好
int n = selector.select();
if (n == 0) {
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
// 遍历selectionKey
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
// 获取Channel
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 获取连接
SocketChannel channel = server.accept();
System.out.println("收到一个新的连接...");
// Channel 注册
registerChannel(selector, channel, SelectionKey.OP_READ);
// 客户端数+1
remoteClientNum++;
// 响应数据
write(channel, "hello !".getBytes());
}
if (key.isReadable()) {
read(key);
}
iterator.remove();
}
}
}
/**
* 读取数据
*
* @param selectionKey
*/
private void read(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
int count;
byteBuffer.clear();
// 从缓冲区里读取数据
System.out.println("client:");
while ((count = socketChannel.read(byteBuffer)) > 0) {
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
System.out.print((char) byteBuffer.get());
}
byteBuffer.clear();
}
if (count < 0) {
socketChannel.close();
}
}
private void write(SocketChannel socketChannel, byte[] data) {
// 往缓存区里写数据
byteBuffer.clear();
byteBuffer.put(data);
byteBuffer.flip();
// 将缓存区的数据写入到管道
try {
socketChannel.write(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 绑定selector
*
* @param selector
* @param socketChannel
* @param op
*/
private void registerChannel(Selector selector, SocketChannel socketChannel, int op) {
// 将信道与selector绑定
if (socketChannel == null) {
return;
}
try {
socketChannel.configureBlocking(false);
socketChannel.register(selector, op);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
MyServer server = new MyServer(8080);
try {
server.listen();
} catch (Exception e) {
e.printStackTrace();
}
}
}
解析: Server 端通过selector.select()方法监听 socketChannel, 如果有事件从channel过来,那么通过socketChannel.write(byteBuffer) 响应客户端, 另外判断selectionKey是否为可读,如果可读那么读取客户端发过来的数据。
2.2 手写NIO Client
package com.nio.demo.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* @author bingbing
* @date 2021/5/13 0013 18:55
*/
public class MyClient {
private int size = 1024;
private ByteBuffer byteBuffer;
private SocketChannel socketChannel;
public void connectServer() throws Exception {
socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
byteBuffer = ByteBuffer.allocate(size);
socketChannel.configureBlocking(false);
receive();
}
private void receive() throws IOException {
while (true) {
byteBuffer.clear();
int count;
while ((count = socketChannel.read(byteBuffer)) > 0) {
byteBuffer.flip();
System.out.println("server:");
while (byteBuffer.hasRemaining()) {
System.out.print((char) byteBuffer.get());
}
// 向服务器写数据
sendToServer("say , hi".getBytes());
byteBuffer.clear();
}
}
}
private void sendToServer(byte[] data) throws IOException {
byteBuffer.clear();
byteBuffer.put(data);
byteBuffer.flip();
socketChannel.write(byteBuffer);
}
public static void main(String[] args) {
try {
new MyClient().connectServer();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3. 高性能的NIO框架Netty
Netty是一个基于IO多路复用的高性能的网络层框架, 是目前最流行的NIO框架, 分布式框架dubbo也集成了Netty, Netty 是Dubbo高并发的关键。
3.1 NettyServer关键类、接口和方法学习
基于Netty 4.1
在学习nettyServer前,我们可以先了解一下一些关键类和接口的用法,有助于帮我们熟悉netty代码的执行流程。
3.1.1 ServerSocketChannel接口
他有4个实现类,实现了初始化socketChannel通道、提供信道的socket、绑定ip、读取消息等方法。其中常用的3个实现类分别是EpollServerSocketChannel, KQueueServerSoketChannel,NioserverSocketChannel 。
为什么有这3个实现类?
这3个实现类其实对应的是3中IO多路复用的实现方式。其中epoll是linux环境下独有的IO多路复用的实现方式,kqueue是unix和mac os系统上运行的IO多路复用的实现方式,NioServerSocketChannel是通过selector选择器实现的IO多路复用。
3.1.2 ServerBootstrap类
Netty服务端的启动根类,继承AbastractBootStrap, 主要作用初始服务端的信道Channel(在init(Channel channel)方法里实现)。
在init()方法做server端需要的初始化动作,初始化信道Channel, 然后将EventLoopGroup 与ServerBootStrapAcceptor绑定用来监听socket。
@Override
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
3.1.3 group() 方法
group(EventLoopGroup group)
Specify the
EventLoopGroup
which is used for the parent (acceptor) and the child (client).
官方解释: 指定一个EventLoopGroup, 用来给父亲或者孩子,其实都是Channel。父亲表示的是管道的输入,孩子表示管道的输出。
通俗解释: 设置组, 指定一个事件循环组,组里成员分别是接收数据的成员和发送数据的Channel,分别称为 acceptor和client。
3.1.4 ChannelInboundHandlerAdapter类
重要方法:
1) channelActive(ChannelHandlerContext ctx) , 初始化ChannelHandlerContext实例, 可以通过该实例往pipeline里写数据。
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx;
}
2) channelRead(ChannelHandlerContex t, Object msg); 该类读取从管道留过来的数据。
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg.toString();
notify();
}
3) 在ChannelInboundHandlerAdapter的实现类里,一般需要实现Callable接口,该接口里的call方法在线程执行后被调用。
public synchronized Object call() throws Exception {
context.writeAndFlush(this.invocation);
wait();
return result;
}
3.2 NettyClient关键类、接口和方法学习
3.2.1 客户端启动类Bootstrap
bootstrap类是client的启动类,通过bootstrap 初始化信道绑定pipeline.
bootstrap里的重要方法
1) connect(hostName,port) 方法, 此方法用来连接服务端。
2) option() 方法,此方法来来自于抽象类AbstractBootstrap,用来设置响应参数,如 ChannelOption.TCP_NODELAY, true, 设置为无延迟的TCP响应。
以上是关于Java NIO 篇的主要内容,如果未能解决你的问题,请参考以下文章