Netty入门——组件(EventLoop)
Posted 小志的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty入门——组件(EventLoop)相关的知识,希望对你有一定的参考价值。
目录
一、EventLoop(事件循环对象)
1.1、EventLoop(事件循环对象)的概述
- EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。
1.2、EventLoop(事件循环对象)的继承关系
- 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
- 另一条线是继承自 netty 自己的 OrderedEventExecutor
二、EventLoopGroup (事件循环组)
2.1、EventLoopGroup (事件循环组)的概述
- EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
2.2、EventLoopGroup (事件循环组)的继承关系
- 继承自 netty 自己的 EventExecutorGroup
(1)、实现了 Iterable 接口提供遍历 EventLoop 的能力;
(2)、另有 next 方法获取集合中下一个 EventLoop;
三、EventLoopGroup (事件循环组)代码示例
3.1、获取事件循环对象代码示例
-
引入pom依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.39.Final</version> </dependency>
-
代码示例
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.EventExecutor; public class EventLoopTest public static void main(String[] args) // 1. 创建事件循环组;NioEventLoopGroup即可处理io事件,又能提交普通任务和定时任务 NioEventLoopGroup group = new NioEventLoopGroup(2); // 1. 创建事件循环组;DefaultEventLoopGroup只能提交普通任务和定时任务 //DefaultEventLoopGroup group = new DefaultEventLoopGroup(2); // 2. 第一种方式:获取下一个事件循环对象 System.out.println("第一种方式:获取下一个事件循环对象===="+group.next()); System.out.println("第一种方式:获取下一个事件循环对象===="+group.next()); System.out.println("第一种方式:获取下一个事件循环对象===="+group.next()); System.out.println("第一种方式:获取下一个事件循环对象===="+group.next()); // 2. 第二种方式:通过遍历获取下一个事件循环对象 for(EventExecutor eventLoopGroup :group) System.out.println("第二种方式:通过遍历获取下一个事件循环对象=="+eventLoopGroup);
-
输出结果
3.2、执行普通任务代码示例
-
引入pom依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.39.Final</version> </dependency>
-
代码示例
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.EventExecutor; public class EventLoopTest public static void main(String[] args) // 1. 创建事件循环组;NioEventLoopGroup即可处理io事件,又能提交普通任务和定时任务 NioEventLoopGroup group = new NioEventLoopGroup(2); // 2. 执行普通任务 group.next().execute(()-> try Thread.sleep(1000); catch (InterruptedException e) e.printStackTrace(); System.out.println("=====普通任务====="); ); System.out.println("主线程");
-
输出结果
3.3、执行定时任务代码示例
-
引入pom依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.39.Final</version> </dependency>
-
代码示例
import io.netty.channel.nio.NioEventLoopGroup; import java.util.concurrent.TimeUnit; public class EventLoopTest public static void main(String[] args) // 1. 创建事件循环组;NioEventLoopGroup即可处理io事件,又能提交普通任务和定时任务 NioEventLoopGroup group = new NioEventLoopGroup(2); // 4. 执行定时任务 group.next().scheduleAtFixedRate(()-> System.out.println("定时任务"); ,0,1, TimeUnit.SECONDS); System.out.println("主线程");
-
输出结果
3.4、处理 io 事件
3.4.1、引入pom依赖
-
引入pom依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.39.Final</version> </dependency>
3.4.2、服务端代码示例
-
服务端代码示例
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioserverSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; import java.nio.charset.Charset; /** * @description: 处理io任务 服务端 * @author: xz */ @Slf4j public class EventLoopServer public static void main(String[] args) //1、服务端启动器:负责组装netty组件 new ServerBootstrap() //2、将EventLoop分为boss和worker(即将EventLoop分工细化) // boss即第1个参数,只负责accept事件; // worker即第2个参数,只负责socketChannel上的读写 .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)) //3、选择服务器的 ServerSocketChannel 实现 .channel(NioServerSocketChannel.class) //4、添加服务端处理器 .childHandler( // 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler new ChannelInitializer<NioSocketChannel>() @Override protected void initChannel(NioSocketChannel ch) throws Exception //6、添加具体 handler ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception //msg转ByteBuf ByteBuf buf = (ByteBuf) msg; //ByteBuf转字符串 log.debug(buf.toString(Charset.defaultCharset())); //让消息传递给下一个handler ctx.fireChannelRead(msg); ); ) //7、绑定监听端口 .bind(8080);
3.4.3、客户端代码示例
-
客户端代码示例
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringEncoder; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; /** * @description: 处理io任务 客户端 * @author: xz */ @Slf4j public class EventLoopClient public static void main(String[] args) throws InterruptedException // 1. 客户端启动器 Channel channel = new Bootstrap() // 2. 添加 EventLoop(事件循环) .group(new NioEventLoopGroup(1)) // 3. 选择客户端的 SocketChannel 实现 .channel(NioSocketChannel.class) // 4. 添加客户端处理器 .handler(new ChannelInitializer<NioSocketChannel>() // 在连接建立后被调用 @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception //9. 消息会经过通道 handler 处理,这里是将 String => ByteBuf 发出 nioSocketChannel.pipeline().addLast(new StringEncoder()); ) //5. 连接到服务器 .connect(new InetSocketAddress("localhost", 8080)) //6. 等待 connect 建立连接完毕 .sync() //7. 连接对象 .channel(); System.out.println("打印channel对象==="+channel); //8. 发送数据 channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("aaaaaa".getBytes())); Thread.sleep(2000); channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("aaaaaa".getBytes()));
3.4.4、分别启动3次客户端
-
第一次启动:先启动服务端,再启动客户端,输出结果如下:
注:第一次启动使用线程名称为nioEventLoopGroup-3-1的进行处理
-
第二次启动:重新启动客户端,输出结果如下:
注:第二次启动使用线程名称为nioEventLoopGroup-3-2的进行处理
-
第三次启动:重新启动客户端,输出结果如下:
注:第三次启动仍然使用线程名称为nioEventLoopGroup-3-1的进行处理
3.4.5、分别启动3次客户端,查看处理结果
- 由3.4.4步骤可知,可以看到两个线程(即工人)轮流处理 channel,但线程(即工人)与 channel 之间进行了绑定;
3.5、增加两个非 nio 工人处理 io 事件
3.5.1、引入pom依赖
-
引入pom依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.39.Final</version> </dependency>
3.5.2、服务端代码示例
-
服务端代码示例
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; import java.nio.charset.Charset; /** * @description: EventLoop处理io任务 服务端 * @author: xz */ @Slf4j public class EventLoopServer public static void main(String[] args) //创建一个独立的EventLoopGroup DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2); //1、服务端启动器:负责组装netty组件 new ServerBootstrap() //2、将EventLoop分为boss和worker(即将EventLoop分工细化) // boss即第1个参数,只负责accept事件; worker即第2个参数,只负责socketChannel上的读写 .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)) //3、选择服务器的 ServerSocketChannel 实现 .channel(NioServerSocketChannel.class) //4、添加服务端处理器 .childHandler( // 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler new ChannelInitializer<NioSocketChannel>() @Override protected void initChannel(NioSocketChannel ch) throws Exception //6、添加具体 handler ch.pipeline().addLast(normalWorkers,"myhandler", new ChannelInboundHandlerAdapter() @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception //msg转ByteBuf ByteBuf buf = (ByteBuf) msg; //ByteBuf转字符串 log.debug(buf.toString(Charset.defaultCharset())); //让消息传递给下一个handler ctx.fireChannelRead(msg); ); ) //7、绑定监听端口 .bind(8080);
3.5.3、客户端代码示例
- 客户端代码与3.4.3步骤中代码无变化
3.5.4、分别启动3次客户端
-
第一次启动:先启动服务端,再启动客户端,输出结果如下:
注:第一次启动使用线程名称为defaultEventLoopGroup-2-1的进行处理
-
第二次启动:重新启动客户端,输出结果如下:
注:第二次启动使用线程名称为defaultEventLoopGroup-2-2的进行处理
-
第三次启动:重新启动客户端,输出结果如下:
注:第三次启动仍然使用线程名称为defaultEventLoopGroup-2-1的进行处理
3.5.5、分别启动3次客户端,查看处理结果
- 由3.5.4步骤可知,可以看到非 nio 工人也分别绑定了 channel,我们自己的 myhandler 由非 nio 工人执行)
以上是关于Netty入门——组件(EventLoop)的主要内容,如果未能解决你的问题,请参考以下文章