6.netty线程模型-Reactor
Posted PacosonSWJTU
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了6.netty线程模型-Reactor相关的知识,希望对你有一定的参考价值。
【README】
1..本文部分内容翻译自:
[Netty] Netty's thread model and simple usage
2.netty模型是以 Reactor模式为基础的,具体的,netty使用的是 主从Reactor多线程模型;
3.先介绍了 Reactor线程模型;后介绍了 Netty 组成部分;
4.文末还po出了 netty服务器与客户端代码实现(总结自 B站《尚硅谷-netty》);
【注意】
- 本文所有图片均转自:
[Netty] Netty's thread model and simple usage
【1】netty线程模型-Reactor
1)Reactor模式
- reactor模式,也叫分发器模式。当1个或多个请求同时发送到服务器时,服务器同步地把它们分发到独立的处理线程;
2)Reactor模式的三种角色:
- Acceptor: (接收器)处理客户端新连接,并分配客户端分发到处理线程链;
- Reacor:(反应器)负责监听,分配事件及IO事件给对应处理器Handler;
- Handler:(处理器)处理事件,如编码,业务逻辑处理,解码等;
3)Reactor的3种线程模型
- 单Reactor单线程模型;
- 单Reactor多线程模型;
- 主从Reactor多线程模型;
而 netty 使用的是 主从 Reactor多线程模型;
4)除建立连接外,服务器处理客户端请求涉及的操作大致有5种:
- IO读;
- 编码;
- 计算;
- 解码;
- IO写;
【2】Reactor3种线程模型
【2.1】单Reactor单线程模型
1)单Reactor单线程模型概述:
- 在该线程模型下,所有请求连接的建立,IO读写,及业务逻辑处理都全部在同一个线程;
- 若耗时操作发生在业务逻辑处理,则所有的请求都会延迟 与阻塞,因为这个线程上的所有操作都是同步的;这一点应该比较好理解;
2)模型图
【图解】
- 服务器 仅 有一个 Reactor 线程;
- IO读,解码,计算,编码,发送(IO写)这5个步骤都由单个Reactor 线程来完成,一旦有一个步骤耗时过长,则整体阻塞;
【优缺点】
- 缺点:因为只有一个Reactor线程, 请求1耗时长会导致其他请求阻塞直到请求1处理完成;服务器并发性非常低;
【2.2】单Reactor多线程模型
1)单Reactor多线程模型概述:
- 为了防止阻塞,请求连接的建立(包括认证与授权)及IO读写都在 Reactor线程里;
- 此外,业务逻辑处理完全由异步线程池负责处理,并在处理后把结果写回;
2)模型图
【图解】
- 服务器 仅 有一个 Reactor 线程;
- IO读与发送(IO写)由 Reactor线程负责执行;
- 解码,计算,编码等业务逻辑处理由 工作线程池(WorkThreads)负责分配线程执行;
【优缺点】
- 优点:解码,计算,编码等耗时操作若有阻塞,不会导致服务器的其他请求阻塞;
- 缺点:仅有一个 Reactor线程来处理客户端连接,若客户端IO读写数据量大,容易在IO读写时发生阻塞;
【2.3】主从Reactor多线程模型
1)主从Reactor多线程模型概述:由于单Reactor线程会降低多核cpu能力(或未能发挥),所以需要建立多Reactor线程,即主从Reactor线程模型。
- 主Reactor线程(单个):用于建立请求的连接(包括认证与授权);
- 从Reactor线程(多个):用于处理IO读写;
- 补充:其他业务逻辑如解码,计算,编码等工作还是交由异步线程池(Worker线程池)处理;
【图解】主从Reactor多线程模型下,服务器线程有3类:
- 第1类:主Reactor线程负责客户端连接的建立(同步);
- 第2类:子Reactor线程有多个,封装在线程池,异步处理客户端的IO读写(read 与 send);
- 第3类:工作线程池:异步处理业务逻辑,包括 编码,计算,解码等操作;
【优缺点】
- 优点1:显然,IO读写若有阻塞,不会导致其他客户端请求阻塞;
- 优点2:显然,业务逻辑若有阻塞(耗时操作),也不会导致其他客户端请求阻塞;
- 小结:在主从Reactor多线程模型条,服务器的并发性非常高(也充分利用了多核cpu的算力);
【3】netty线程模型
【3.1】成员关系
1)netty线程模型包含 ServerBootStrap,NioEventLoopGroup 及其组成部分 NioEventLoop,NioChannel, ChannelPipeline, ChannelHandler 等;
- ServerBootStrap:服务器启动引导对象;
- NioEventLoopGroup:Nio 事件循环组;
- NioEventLoop:Nio事件循环;
- NioChannel:Nio通道;
- ChannelPipeline:通道管道(封装了多个处理器);
- ChannelHandler:处理器;
2)netty使用主从Reactor多线程模型来实现。
- 主Reactor:对应 BossGroup;
- 从(子)Reactor:对应 WorkerGroup;
- BossGroup 用于接收连接并通过通道注册 与 WorkerGroup已建立的连接;
- 当IO事件被触发,该事件交由管道处理;管道处理实际上是由对应的处理器Handler来处理;
【3.2】EventLoop 事件循环
1)即 事件循环器;顾名思义,他实际上是一个循环的处理过程;
2)EventLoop 等同于 while(true) 死循环里的代码段;
3)EventLoop 主要包含了一个选择器多路复用器和一个任务队列
- 选择器多路复用器:处理IO事件的;
- 任务队列:存储已提交任务;
4)EventLoop并不是从程序启动时就开始运行,而是当有任务提交时,它才会开始处理任务并一直运行;
【3.3】通道 channel
- 1)netty封装了 java的本地Channel,并引入了 管道与通道处理器的概念。
- 2)通道具体的IO事件处理是通过管道和通道处理器(管道中)来完成的;
【4】netty服务器与客户端-代码实现
1)初始化:
- 服务器通过 ServerBootStrap,可以直接设置线程组;
- 服务器的bossGroup(boss线程组)用于处理 NioserverSocketChannel 通道的 Accept 事件,即处理请求连接与认证;
- 服务器的 workerGroup(工作线程组)用于处理 IO 读写事件及已提交的异步任务;
2)ServerBootStrap初始化 代码实现:
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup(1); // 设置boss线程组中的线程个数为1 (默认是cpu核数*2)
EventLoopGroup work = new NioEventLoopGroup(8); // 设置worker线程组中的线程个数为8 (默认是cpu核数*2)
bootstrap.group(boss, work).channel(NioServerSocketChannel.class); // 把参数设置到bootstrap
【4.1】服务器
1)netty服务器
/**
* @Description 简单netty服务器
* @author xiao tang
* @version 1.0.0
* @createTime 2022年08月27日
*/
public class SimpleNettyServer44
public static void main(String[] args) throws InterruptedException
// 创建 BossGroup 和 WorkerGroup
// 1. 创建2个线程组 bossGroup, workerGroup
// 2 bossGroup 仅处理连接请求; 真正的业务逻辑,交给workerGroup完成;
// 3 两个线程组都是无限循环
// 4 bossGroup 和 workerGroup 含有的子线程(NIOEventLoop)个数
// 默认是 cpu核数 * 2
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGruop = new NioEventLoopGroup();
try
// 创建服务器端的启动对象, 配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boosGroup, workerGruop) // 设置2个线程组
.channel(NioServerSocketChannel.class) // 使用NIOSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接的个数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() // 创建一个通道初始化对象
// 给 pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception
socketChannel.pipeline().addLast(new SimpleNettyServerHandler45());
); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
System.out.println("... server is ready.");
// 启动服务器, 绑定端口并同步处理 ,生成一个 ChannelFuture对象
ChannelFuture channelFuture = bootstrap.bind(6668).sync();
channelFuture.addListener((future1) -> System.out.println("Finish binding"));
// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
finally
// 优雅关闭
boosGroup.shutdownGracefully();
workerGruop.shutdownGracefully();
2)netty服务器中的处理器(封装到管道)
/**
* @Description netty服务器处理器
* @author xiao tang
* @version 1.0.0
* @createTime 2022年08月27日
*/
public class SimpleNettyServerHandler45 extends ChannelInboundHandlerAdapter
// 读写数据事件(读取客户端发送的消息)
// 1. ChannelHandlerContext ctx: 上下文信息,包括管道pipeline,通道channel,地址
// 2. Object msg: 客户端发送的数据,默认是 Object
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
System.out.println("server ctx = " + ctx);
System.out.println("查看 channel 和 pipeline的关系 ");
Channel channel = ctx.channel();
ChannelPipeline channelPipeline = ctx.pipeline(); // 管道是双向链表,出栈入栈
// 将 msg 转为 ByteBuf 字节缓冲
// 这个 ByteBuf 是 netty提供的, 不是 nio的ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息:" + buf.toString(StandardCharsets.UTF_8));
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
// 数据读取完毕,回复客户端
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
// writeAndFlush 是 write + flush ;把数据写入到缓冲,并刷新
ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", StandardCharsets.UTF_8));
channelFuture.addListener(future -> System.out.println("回复成功"));
// 处理异常,关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
ctx.channel().close();
【4.2】客户端
1)netty客户端
/**
* @Description netty客户端
* @author xiao tang
* @version 1.0.0
* @createTime 2022年08月27日
*/
public class SimpleNettyClient46
public static void main(String[] args) throws InterruptedException
// 客户端需要一个事件循环组
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try
// 创建客户端启动对象, 注意是 BootStrap
Bootstrap bootstrap = new Bootstrap();
// 设置相关参数
bootstrap.group(eventLoopGroup) // 设置线程组
.channel(NioSocketChannel.class) // 设置客户端通道实现类(反射)
.handler(new ChannelInitializer<SocketChannel>()
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception
socketChannel.pipeline().addLast(new SimpleNettyClientHaandler()); // 加入自己的处理器
);
System.out.println("client is ok");
// 启动客户端去连接服务器
// ChannelFuture, 设计到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
// 给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
finally
eventLoopGroup.shutdownGracefully();
2)netty客户端中的处理器
/**
* @Description netty客户端处理器
* @author xiao tang
* @version 1.0.0
* @createTime 2022年08月27日
*/
public class SimpleNettyClientHaandler extends ChannelInboundHandlerAdapter
// 当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
System.out.println("client " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服务器", StandardCharsets.UTF_8));
// 当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务器回复消息:" + byteBuf.toString(StandardCharsets.UTF_8));
System.out.println("服务器地址:" + ctx.channel().remoteAddress());
// 捕获异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
cause.printStackTrace();
ctx.close();
【4.3】演示效果
server端:
... server is ready.
server ctx = ChannelHandlerContext(SimpleNettyServerHandler45#0, [id: 0x2995c48c, L:/127.0.0.1:6668 - R:/127.0.0.1:49427])
查看 channel 和 pipeline的关系
客户端发送消息:hello, 服务器
客户端地址:/127.0.0.1:49427
client端:
client is ok
client ChannelHandlerContext(SimpleNettyClientHaandler#0, [id: 0xe92524d8, L:/127.0.0.1:49427 - R:/127.0.0.1:6668])
服务器回复消息:hello, 客户端
服务器地址:/127.0.0.1:6668
【5】 netty小结
- netty使用了 主从Reactor多线程模型;
- EventLoop是netty处理请求,io事件及其他操作的检测与分配(这句翻译的不好,最好查看原文);
- netty引入了 通道Channel,管道 Pipeline,通道处理器 ChannelHandler 以异步处理任务;
- netty提供了 ServerBootStrap 简化了管道初始化;
以上是关于6.netty线程模型-Reactor的主要内容,如果未能解决你的问题,请参考以下文章
Netty的Reactor多线程模型,NioEventLoop,ChannelPipeline简介