7.提交任务到 NioEventLoop(Nio事件循环执行线程)
Posted PacosonSWJTU
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了7.提交任务到 NioEventLoop(Nio事件循环执行线程)相关的知识,希望对你有一定的参考价值。
【README】
1.本文总结自 B站 《尚硅谷-netty》;
2.NioEventLoop实际上是一个单独线程,执行 taskQueue中的任务(串行);
【1】提交任务到NioEventLoop
1)NioEventLoop:
- 表示一个不断循环执行任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的 socket 网络通道;
2)NioEventLoop 内部采用串行化设计:
- 从消息的读取->解码->处理->编码->发送,始终由IO 线程 NioEventLoop 负责;
3)NioEventLoopGroup 下包含多个 NioEventLoop ;
- 每个 NioEventLoop 中包含有一个 Selector,一个taskQueue ;
- 每个 NioEventLoop 的 Selector 上可以注册监听多个NioChannel ;
- 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上;
- 每个 NioChannel 都绑定有一个自己的 ChannelPipeline ;
4)任务提交到 NioEventLoop 后,实际会添加到 taskQueue ;
- taskQueue的访问路径如下:
- ChannelHandlerContext ->
- DefaultChannelPipeline ->
- NiosocketChannel ->
- NioEventLoop ->
- taskQueue (任务队列);
【1.1】场景1-在 netty server的handler中提交普通任务
/**
* @Description netty服务器处理器
* @author xiao tang
* @version 1.0.0
* @createTime 2022年08月27日
*/
public class SimpleNettyServerHandler45 extends ChannelInboundHandlerAdapter
// 读写数据事件(读取客户端发送的消息)
// 1. ChannelHandlerContext ctx: 上下文信息,包括管道pipeline,通道channel,地址
// 2. Object msg: 客户端发送的数据,默认是 Object
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
System.out.println("server ctx = " + ctx);
System.out.println("查看 channel 和 pipeline的关系 ");
Channel channel = ctx.channel();
ChannelPipeline channelPipeline = ctx.pipeline(); // 管道是双向链表,出栈入栈
// 将 msg 转为 ByteBuf 字节缓冲
// 这个 ByteBuf 是 netty提供的, 不是 nio的ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息:" + buf.toString(StandardCharsets.UTF_8));
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
// 同步任务1
// 业务场景: 有一个耗时长的业务 -> 异步执行 -> 提交该 channel对应的 NIOEventLoop 的 taskQueue 中;
// Thread.sleep(10 * 1000);
// ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端,我是同步任务1", StandardCharsets.UTF_8));
// System.out.println("go on.....");
// 以上耗时操作的解决方案1:用户程序自定义的普通任务
// 异步任务2
ctx.channel().eventLoop().execute(new Runnable()
@Override
public void run()
try
Thread.sleep(10 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端,我是异步任务2-休眠10s", StandardCharsets.UTF_8));
catch (Exception e)
System.out.println("发生异常, " + e.getMessage());
);
// 异步任务3
ctx.channel().eventLoop().execute(new Runnable()
@Override
public void run()
try
Thread.sleep(20 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端,我是异步任务3-休眠20s", StandardCharsets.UTF_8));
catch (Exception e)
System.out.println("发生异常, " + e.getMessage());
);
【1.2】场景2-在 netty server的handler中提交定时任务
// 异步任务2和异步任务3添加到同一个任务队列,由同一个线程来运行,所以异步任务2阻塞10s,而异步任务3会阻塞30s(10+20)
System.out.println("异步任务 go on.....");
// 用户自定义定时任务 -》 定时任务提交到 scheduledTaskQueue 中
ctx.channel().eventLoop().schedule(new Runnable()
@Override
public void run()
try
Thread.sleep(20 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端,我是定时任务1-休眠5s", StandardCharsets.UTF_8));
catch (Exception e)
System.out.println("发生异常, " + e.getMessage());
, 5, TimeUnit.SECONDS);
System.out.println("定时任务 go on.....");
【代码解说】
- 以上任务提交后,实际会添加到 taskQueue ;
- taskQueue的访问路径如下:
- ChannelHandlerContext ->
- DefaultChannelPipeline ->
- NioSocketChannel ->
- NioEventLoop ->
- taskQueue (任务队列);
- 每个NioEventLoop只能使用1个独立线程运行任务队列中的任务,即多个任务串行执行;
【补充】taskQueue目录树:
以上是关于7.提交任务到 NioEventLoop(Nio事件循环执行线程)的主要内容,如果未能解决你的问题,请参考以下文章
Netty源码分析 NioEventLoop的rebuildSelector方法解决Nio中select方法导致cpu100%的BUG
Netty的Reactor多线程模型,NioEventLoop,ChannelPipeline简介