详解BIOAIONIO Netty 知识点和工作原理
Posted 小吴吃肉啦~
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了详解BIOAIONIO Netty 知识点和工作原理相关的知识,希望对你有一定的参考价值。
Netty框架 基础
三大网络编程
BIO
同步阻塞:服务器实现模式一个连接一个线程,既客户端有连接请求时,服务器就需要启动一个线程进行处理,如果这个连接不任何事情会造成不必要的线程线程开销
适用场景: 连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解
NIO
**同步非阻塞:**服务器实现一个模式为一个线程处理多个请求(连接),即可达发送的连接请求,都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。
适用场景:连接数目多且连接比较短,eg:聊天服务器,弹幕系,服务期间通讯。
基本介绍:是哟个线程从某个通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就说明都不会获取,而不是保持线程阻塞,所以直至数据变得可以读取之前,该线程可以继续做其他的事情。
AIO
异步非阻塞:AIO引入异步通道的概念,采用Proactor模式,简化了程序编写,有效的请求才其启动线程,它的特点是先由操作系统完成后才通知服务端程序去处理,一般适用于链接数较多的且连接时间较长的应用。
适用场景: 连接数目多 且 链接比较长 ,eg: 相册服务器
BIO编程简单流程
- 服务器端启动一个 ServiceSocket
- 客户端启动 Socket 对服务器进行通信,默认情况下服务器需要对每个客户 建立一个线程与之通讯
- 客户端发出请求后,先咨询服务器是否有线程响应,如果没有则还会等待,或者被拒绝
- 如果有响应,客户端线程会的等待请求结束后,在继续执行
服务器端:
package com.atguigu.bio;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Bioservice
public static void main(String[] args) throws Exception
//线程池机制
//思路
//1. 创建一个线程池
//2 如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法)
//1. 创建一个线程池
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
//创建一个serversocket
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("服务器启动");
while (true)
// 监听,等待客户端连接
final Socket socket = serverSocket.accept();
System.out.println("连接到一个客户端");
//就创建一个线程,与之通讯(单独写一个方法)
newCachedThreadPool.execute(new Runnable()
public void run()
handler(socket);
);
//编写一个handler方法,和客户端通讯
public static void handler(Socket socket)
try
System.out.println("线程信息 id = " + Thread.currentThread().getId() +
"线程名字 =" + Thread.currentThread().getName());
byte[] bytes = new byte[1024];
//通过socker 获取输入流
InputStream inputStream = socket.getInputStream();
//循环的读取客户端的发送的数据
while (true)
System.out.println("线程信息 id = " + Thread.currentThread().getId() +
"线程名字 =" + Thread.currentThread().getName());
int read = inputStream.read(bytes);
if (read != -1)
System.out.print(new String(bytes, 0, read));//输出客户端发送的数据
else
break;
catch (IOException e)
e.printStackTrace();
finally
System.out.println("关闭和client连接");
try
socket.close();
catch (IOException e)
e.printStackTrace();
客户端: cmd telnet 127.0.0.1 6666
NIO核心三大核心
NIO三大核心部分 : Channnel(通道) Buffer(缓冲区) Selector(选着器)
三者关系说明
- 每个channel 都会对应一个Buffer
- Selector对对应一个线程,一个线程可以对应多个channel
- 该图反应了有三个channel注册到 该selector
- 程序切换到哪个channel是有时间决定的,Event就是一个重要的概念
- Selector会根据不同的事件,在各个通道上切换
- Buffer就是一个内存块,底层是有一个数组
- 数据的读取写入是通过Buffer,这个和BIO,BIO中要么是输入流,或者是输出流 不能双向,但是NIO的Buffer 是可以读也可以写,需要flip方法的切换
- channel是双向的,可以返回底层操作系统的情况、
Buffer(缓冲区)
buffer子类(API)
- ByteBuffer,存储字节数据到缓冲区
- ShortBuffer,存储字符串数据到缓冲区
- CharBuffer,存储字符数据到缓冲区
- IntBuffer,存储整数数据到缓冲区
- LongBuffer,存储长整型数据到缓冲区
- DoubleBuffer,存储小数到缓冲区
- FloatBuffer,存储小数到缓冲区
Buffer支持类型化的put 和 get,put放入什么数据类型,get就应该使用响应的数据类型来取出,否则可嫩硅油 BufferUnderflowException 异常
buffer属性
属性 | 描述 |
---|---|
Capacity | 容量,即可以容纳的最大数量,在缓冲区创建时被设定并且不能改变 |
Limit | 表示缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的 |
Postion | 位置,下一个要被读写的元素的索引,每次读写缓冲区数据都会被改变数值,为下一次的读写准备 |
Mark | 标记 |
Channel(通道)
通道可以同时进行读写,而流只能读or之后hi能写
通道可以实现异步读写数据
通道可以从缓存读数据,也可以写数据到缓存
基本介绍:
- BIO 中的stream似乎单向的,例如FileInputStream对象只能进行读取数据的操作,而NIO中的通道(Channel)是双向的,可以读操作,也可以写操作
- Channel在NIO中是一个接口
- 常用的Channel 类有: FileChannel、DatagramChannel、ServerSocketChannel 和 SocketChannel
- FileChannel 用于文件数据的读写,DatagramChannel 用于UDP的数据读写,ServerSocketChannel 和SocketChannel 用于TCP的数据读写。
FileChannel类
FileChannel主要用来对本地文件进行IO操作,常见的方法有:
public int read(ByteBuffer dst) ,从通道读取数据并放到缓冲区
public int write(ByteBuffer src),把缓冲区的数据写到通道中
public long transferFrom(ReadableByteChannel src,long position,long count),从目标通道中复制数据到当前通道
public long tansferTo(long postion,long count,WritableByteChannel target),吧数据从当前通道复制给目标通道
write案例
package com.atguigu.nio;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class NIOFileChannel01
public static void main(String[] args) throws Exception
String str = "四达时代";
//创建一个输出流
FileOutputStream fileOutputStream = new FileOutputStream("D:\\\\file01.txt");
//通过fileOutputStream 获取 对应FileChannel
// 这个fileChannel 真实类型是 FileChannelImpl
FileChannel fileChannel = fileOutputStream.getChannel();
//创建一个缓冲区 ByteBuffer 再分配1024空间
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//将 str 放入 Buffer
byteBuffer.put(str.getBytes());
//对buffer ,进行flip 功能反转
byteBuffer.flip();
//把缓冲区的数据 写入 FileChannel中
fileChannel.write(byteBuffer);
//关闭流
fileOutputStream.close();
read案例
package com.atguigu.nio;
import java.io.File;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class NIOFileChannel02
public static void main(String[] args) throws Exception
File file = new File("D:\\\\file01.txt");
FileInputStream fileInputStream = new FileInputStream(file);
//通过fileInputStream 获取对应的ileChannel
FileChannel channel = fileInputStream.getChannel();
//创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
//将 通道的数据 读入到Buffer
channel.read(byteBuffer);
// 将字节转出 字符串
System.out.println(new String(byteBuffer.array()));
fileInputStream.close();
transferFrom案例
Selector(选择器)
Java的NIO,用非阻塞的IO方式,可以使用一个线程,处理多个的客户端链接,就会使用Selector(选择器)
Selector 能够检测多个组测的通道上是否有事件发生(注意:多个channel以事件的方式可以注册到同意哦个Selector),如果有事件发生,便获取事件然后针对每个事件进行响应的处理。这样就可以只用一个单线程曲棍里多个通道,也就是管理多个连接和请求。
只有在链接真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必每个连接都创建一个线程
避免了多线程之间的上下文切换导致额开销。
Selector 方法说明
监控所有注册的通道,当其中有io流
方法 | 说明 |
---|---|
open() | 得到一个选择器对象 |
selcet(long timeout) | 阻塞 xxxx时间,在xxxx毫秒后返回 |
select | 阻塞 |
wakeup() | 唤醒selector |
selectorNow() | 不阻塞,立马返还 |
Seletor、SelectionKey、ServerScokerChannel和SocketChannle 关系图梳理
下图说明:
-
当客户端连接时,会通过ServerSocketChannel 得到SocketChannel
-
Seletor进行监听,select方法,返回有事件发生的通道个数
-
将socketChannel注册到Selector上,register(Selector sel,int ops)方法
,一个selector上可以注册多个SocketChannel
register(Selector sel,int ops) Seletor 是对应的seletor ,ops 是xxx事件
-
注册后返回一个SeletionKey,会和该Seletor 关联(集合)
-
Seletor 进行监听 select 方法,返回有事件发生的通道的个数
-
进一步得到各个SelectionKey(有事件发生)
-
在通过SelectionKey 反向获取SocketChannel,方法channel()
-
可以得到channel
Netty
Reactor
Reactor模式
解释:
- Reator模式,通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动)
- 服务器端程序处理传入的多个请求,并将他们同步分派到相应的处理线程
- Reacctor模式适用IO复用监听事件,收到事件后,分发给某个线程(进程),这个点就是网络服务器高并发处理关键
Reactor模式中核心组成:
- Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件做出反应。它就向公司的电话接线员,他接听来自客户的电话并将线路转移到适当的联系人。
- Handlers:处理程序执行IO事件完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor通过调度适当的处理程序响应的IO事件,处理程序执行非阻塞操作。
Reactor模式分类
- 单Reactor单线程
- 单Reactor多线程
- 主从Reactor多线程
单Reactor单线程
优点:模型简单、没有多线程、进程通信、竞争的问题,全部在一个线程中完成
缺点:性能问题,只有一个线程,无法完全发挥多核CPU的性能,
单Reactor多线程
说明:
- Reactor对象通过selelct监控客户端请求事件,收到事件后,通过dispatch进行分发
- 如果建立连接请求,然后创建一个Handler 对象处理完成连接后的各种事件
- 如果不是连接请求,则由reactor分发调用连接对应的handler来处理
- handler只负责响应事件,不做具体的业务处理,通过read读取数据会分发worker线程池的某个蓄电池去处理业务
- worker线程池会分配独立线程完成真正的业务,并将结果返回会给handler
- handler收到响应后,通过send将结果返回给client
优点:充分利用多核CPU的处理
缺点:多线程数据共享,访问比较复杂,reactor处理所有事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈
主从Reactor多线程
**工作原理:**针单Reactor多线程模型中,Reactor在单线程中运行,高并发场景下容易成为性能瓶颈,可以让Reactor在多线程中运行
说明:
- Reactor主线程MainReactor对象通过select监听连接事件,收到事件后,通过Acceptor处理连接事件
- 当Acceptor 处理连接事件后,MainReactor 将 连接分配给SubReactor
- subreactor 将连接加入到连接队列中进行监听,并创建handler进行各种事件处理
- 当有新的事件发生时,subreactoer 调用对应的handler处理
- handler通过read读取数据,分发给后面的worker线程处理
- worker线程池分配独立的wordler线程进行业务处理,并返回结果
- handler 收到响应的结果后,再通过send将结果返回给client
- Reactor主线程可以对应多个Reactor子线程
优点:父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理
父线程与子线程的数据交互简单,Reactor主线程只需要把新连接传给子线程,子线程无需返回数据
缺点:编程复杂的较高
Reactor总结
- 响应快,不必为单个同步事件所阻塞,虽然Reactor本身依然是同步的可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程、进程的切换开销
- 扩张性好,可以方便的通过增加Reactor实例格式来充分利用CPU资源
- 复用性好,Reactor模型本身与具体事件处理逻辑无关,具有很高的复用性
Netty模型
简单版
说明:
- Boss线程维护Selector,只会关注Accecpt
- 当接受到Accept事件,获取到对应的SocketChannel,封装成NIOScoketChannel并注册到Worker线程(事件循环),并进行维护
- 当Worker线程监听到selector中通道发生自己感兴趣二点事件后,就进行处理(就由handler),注意handler 已经加入到通道
进阶版
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Y0Nb2Dap-1668606912814)(D:/Typora/%E7%AC%94%E8%AE%B0%E5%9B%BE/)]
详细版
说明:
- netty抽象出俩组线程池 BossGroup 专门负责接受客户端连接、WorkerGroup 专门负责网络的读写
- BossGroup 和 WorkerGroup 类型 都是NioEventLoopGroup
- NioEventLoopGroup 相当于一个事件循环组,这个组合含有多个事件循环,每一个事件循环时NioEventLoop
- NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket的网络通信
- NioEventLoopGroup 可以有多个线程,既可以含有多个NioEventLoop
- 每个BossNioEventLoop执行的步骤有3步
- 轮询accept事件
- 处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个worker NioWEventLoop上的select
- 处理任务队列的任务,即runAllTasks
- 每个Worker NIOEventLoop循环执行的的步骤
- 轮询read、write 事件
- 处理io事件,即read、write事件,在对应NioSocketChannel处理
- 处理任务队列的任务,即runAllTasks
- 每个Worker NIOEventLoop 处理业务时,会使用pipeline(管道),pipeline中包含了channel,即通过pipeline可以获取到对应通道,管道中维护了很多的处理器
上述流程代码
Netty客户端,
package com.atguigu.netty.simple;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient
public static void main(String[] args) throws Exception
//客户端只需要一个事件循环组
NioEventLoopGroup group = new NioEventLoopGroup();
try
//创建客户端启动对象
//注意客户端使用的不是 ServerBootSTRAP 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) //设置看客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel ch) throws Exception
ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
);
System.out.println("客户端 OK");
//启动客户端 去连接服务器端
//关于
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
finally
group.shutdownGracefully();
Netty客户端Handler (会被客户端调用)
package com.atguigu.netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter
//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
System.out.println("client" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,服务器", CharsetUtil.UTF_8));
//当通道有读取事件时,会触发
/**
* @param ctx 上下文对象,含有管道 pipeline (管道里面含有好多handler) ,通道 channel(通道注重读和写), 地址
* @param msg 就是客户端发送的数据 ,默认是Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
System.out.println("server ctx =" + ctx);
//将msg 转换ByteBuf
//ByteBuf 是 Netty 提供的 , 不是NIO 的ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器端地址:" + ctx.channel().remoteAddress());
//处理异常.一般需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
cause.printStackTrace();
ctx.close();
Netty服务器端,
package com.atguigu.netty.simple;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
//服务端1
public以上是关于详解BIOAIONIO Netty 知识点和工作原理的主要内容,如果未能解决你的问题,请参考以下文章
MQTT---HiveMQ源码详解Netty-MQTT消息事件处理(流程)