Netty入门——组件(EventLoop)

Posted 小志的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty入门——组件(EventLoop)相关的知识,希望对你有一定的参考价值。

目录

一、EventLoop(事件循环对象)

1.1、EventLoop(事件循环对象)的概述

  • EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。

1.2、EventLoop(事件循环对象)的继承关系

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 另一条线是继承自 netty 自己的 OrderedEventExecutor

二、EventLoopGroup (事件循环组)

2.1、EventLoopGroup (事件循环组)的概述

  • EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

2.2、EventLoopGroup (事件循环组)的继承关系

  • 继承自 netty 自己的 EventExecutorGroup
    (1)、实现了 Iterable 接口提供遍历 EventLoop 的能力;
    (2)、另有 next 方法获取集合中下一个 EventLoop;

三、EventLoopGroup (事件循环组)代码示例

3.1、获取事件循环对象代码示例

  • 引入pom依赖

     <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
         <version>4.1.39.Final</version>
     </dependency>
    
  • 代码示例

    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.util.concurrent.EventExecutor;
    public class EventLoopTest 
        public static void main(String[] args) 
            // 1. 创建事件循环组;NioEventLoopGroup即可处理io事件,又能提交普通任务和定时任务
            NioEventLoopGroup group = new NioEventLoopGroup(2);
            // 1. 创建事件循环组;DefaultEventLoopGroup只能提交普通任务和定时任务
            //DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
            // 2. 第一种方式:获取下一个事件循环对象
            System.out.println("第一种方式:获取下一个事件循环对象===="+group.next());
            System.out.println("第一种方式:获取下一个事件循环对象===="+group.next());
            System.out.println("第一种方式:获取下一个事件循环对象===="+group.next());
            System.out.println("第一种方式:获取下一个事件循环对象===="+group.next());
            // 2. 第二种方式:通过遍历获取下一个事件循环对象
            for(EventExecutor eventLoopGroup :group)
                System.out.println("第二种方式:通过遍历获取下一个事件循环对象=="+eventLoopGroup);
            
        
    
    
  • 输出结果

3.2、执行普通任务代码示例

  • 引入pom依赖

     <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
         <version>4.1.39.Final</version>
     </dependency>
    
  • 代码示例

    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.util.concurrent.EventExecutor;
    public class EventLoopTest 
        public static void main(String[] args) 
            // 1. 创建事件循环组;NioEventLoopGroup即可处理io事件,又能提交普通任务和定时任务
            NioEventLoopGroup group = new NioEventLoopGroup(2);
            // 2. 执行普通任务
            group.next().execute(()->
                try 
                    Thread.sleep(1000);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                System.out.println("=====普通任务=====");
            );
            System.out.println("主线程");
        
    
    
  • 输出结果

3.3、执行定时任务代码示例

  • 引入pom依赖

     <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
         <version>4.1.39.Final</version>
     </dependency>
    
  • 代码示例

    import io.netty.channel.nio.NioEventLoopGroup;
    import java.util.concurrent.TimeUnit;
    public class EventLoopTest 
        public static void main(String[] args) 
            // 1. 创建事件循环组;NioEventLoopGroup即可处理io事件,又能提交普通任务和定时任务
            NioEventLoopGroup group = new NioEventLoopGroup(2);
            // 4. 执行定时任务
            group.next().scheduleAtFixedRate(()->
                System.out.println("定时任务");
            ,0,1, TimeUnit.SECONDS);
            System.out.println("主线程");
        
    
    
  • 输出结果

3.4、处理 io 事件

3.4.1、引入pom依赖

  • 引入pom依赖

     <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
         <version>4.1.39.Final</version>
     </dependency>
    

3.4.2、服务端代码示例

  • 服务端代码示例

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioserverSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import lombok.extern.slf4j.Slf4j;
    import java.nio.charset.Charset;
    /**
     * @description: 处理io任务 服务端
     * @author: xz
     */
    @Slf4j
    public class EventLoopServer 
        public static void main(String[] args) 
            //1、服务端启动器:负责组装netty组件
            new ServerBootstrap()
                    //2、将EventLoop分为boss和worker(即将EventLoop分工细化)
                    // boss即第1个参数,只负责accept事件; 
                    // worker即第2个参数,只负责socketChannel上的读写
                    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                    //3、选择服务器的 ServerSocketChannel 实现
                    .channel(NioServerSocketChannel.class)
                    //4、添加服务端处理器
                    .childHandler(
                        // 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler
                        new ChannelInitializer<NioSocketChannel>() 
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception 
                            //6、添加具体 handler
                            ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() 
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
                                    //msg转ByteBuf
                                    ByteBuf buf = (ByteBuf) msg;
                                    //ByteBuf转字符串
                                    log.debug(buf.toString(Charset.defaultCharset()));
                                    //让消息传递给下一个handler
                                    ctx.fireChannelRead(msg);
                                
                            );
                        
                    )
                    //7、绑定监听端口
                    .bind(8080);
        
    
    

3.4.3、客户端代码示例

  • 客户端代码示例

    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBufAllocator;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    import lombok.extern.slf4j.Slf4j;
    import java.net.InetSocketAddress;
    /**
     * @description: 处理io任务 客户端
     * @author: xz
     */
    @Slf4j
    public class EventLoopClient 
        public static void main(String[] args) throws InterruptedException 
            // 1. 客户端启动器
            Channel channel = new Bootstrap()
                    // 2. 添加 EventLoop(事件循环)
                    .group(new NioEventLoopGroup(1))
                    // 3. 选择客户端的 SocketChannel 实现
                    .channel(NioSocketChannel.class)
                    // 4. 添加客户端处理器
                    .handler(new ChannelInitializer<NioSocketChannel>() 
                        // 在连接建立后被调用
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception 
                            //9. 消息会经过通道 handler 处理,这里是将 String => ByteBuf 发出
                            nioSocketChannel.pipeline().addLast(new StringEncoder());
                        
                    )
                    //5. 连接到服务器
                    .connect(new InetSocketAddress("localhost", 8080))
                    //6. 等待 connect 建立连接完毕
                    .sync()
                    //7. 连接对象
                    .channel();
            System.out.println("打印channel对象==="+channel);
            //8. 发送数据
            channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("aaaaaa".getBytes()));
            Thread.sleep(2000);
            channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("aaaaaa".getBytes()));
        
    
    

3.4.4、分别启动3次客户端

  • 第一次启动:先启动服务端,再启动客户端,输出结果如下:

    注:第一次启动使用线程名称为nioEventLoopGroup-3-1的进行处理

  • 第二次启动:重新启动客户端,输出结果如下:

    注:第二次启动使用线程名称为nioEventLoopGroup-3-2的进行处理

  • 第三次启动:重新启动客户端,输出结果如下:
    注:第三次启动仍然使用线程名称为nioEventLoopGroup-3-1的进行处理

3.4.5、分别启动3次客户端,查看处理结果

  • 由3.4.4步骤可知,可以看到两个线程(即工人)轮流处理 channel,但线程(即工人)与 channel 之间进行了绑定;

3.5、增加两个非 nio 工人处理 io 事件

3.5.1、引入pom依赖

  • 引入pom依赖

     <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
         <version>4.1.39.Final</version>
     </dependency>
    

3.5.2、服务端代码示例

  • 服务端代码示例

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.DefaultEventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import lombok.extern.slf4j.Slf4j;
    
    import java.nio.charset.Charset;
    
    /**
     * @description: EventLoop处理io任务 服务端
     * @author: xz
     */
    @Slf4j
    public class EventLoopServer 
        public static void main(String[] args) 
            //创建一个独立的EventLoopGroup
            DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2);
            //1、服务端启动器:负责组装netty组件
            new ServerBootstrap()
                    //2、将EventLoop分为boss和worker(即将EventLoop分工细化)
                    // boss即第1个参数,只负责accept事件; worker即第2个参数,只负责socketChannel上的读写
                    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                    //3、选择服务器的 ServerSocketChannel 实现
                    .channel(NioServerSocketChannel.class)
                    //4、添加服务端处理器
                    .childHandler(
                        // 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler
                        new ChannelInitializer<NioSocketChannel>() 
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception 
                            //6、添加具体 handler
                            ch.pipeline().addLast(normalWorkers,"myhandler", new ChannelInboundHandlerAdapter() 
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
                                    //msg转ByteBuf
                                    ByteBuf buf = (ByteBuf) msg;
                                    //ByteBuf转字符串
                                    log.debug(buf.toString(Charset.defaultCharset()));
                                    //让消息传递给下一个handler
                                    ctx.fireChannelRead(msg);
                                
                            );
                        
                    )
                    //7、绑定监听端口
                    .bind(8080);
        
    
    

3.5.3、客户端代码示例

  • 客户端代码与3.4.3步骤中代码无变化

3.5.4、分别启动3次客户端

  • 第一次启动:先启动服务端,再启动客户端,输出结果如下:

    注:第一次启动使用线程名称为defaultEventLoopGroup-2-1的进行处理

  • 第二次启动:重新启动客户端,输出结果如下:

    注:第二次启动使用线程名称为defaultEventLoopGroup-2-2的进行处理

  • 第三次启动:重新启动客户端,输出结果如下:
    注:第三次启动仍然使用线程名称为defaultEventLoopGroup-2-1的进行处理

3.5.5、分别启动3次客户端,查看处理结果

  • 由3.5.4步骤可知,可以看到非 nio 工人也分别绑定了 channel,我们自己的 myhandler 由非 nio 工人执行)

以上是关于Netty入门——组件(EventLoop)的主要内容,如果未能解决你的问题,请参考以下文章

Netty入门学习

Netty入门-深入理解设计思想

Netty入门-深入理解设计思想

Netty入门——组件(Channel)一

Netty入门——组件(Channel)二

「Netty实战 03」大白话 Netty 核心组件分析