Netty入门——组件(Channel)一

Posted 小志的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty入门——组件(Channel)一相关的知识,希望对你有一定的参考价值。

目录

一、channel的主要作用

channel 中的方法作用
close()用来关闭 channel
closeFuture()处理 channel 的关闭,sync 方法作用是同步等待 channel 关闭,addListener 方法是异步等待 channel 关闭
pipeline()添加处理器
write()将数据写入
writeAndFlush()将数据写入并刷出

二、EventLoop处理io任务代码示例

2.1、服务端代码示例

  • 引入pom依赖

     <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
         <version>4.1.39.Final</version>
     </dependency>
    
  • 服务端

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.DefaultEventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioserverSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import lombok.extern.slf4j.Slf4j;
    
    import java.nio.charset.Charset;
    
    /**
     * @description: EventLoop处理io任务 服务端
     * @author: xz
     */
    @Slf4j
    public class EventLoopServer 
        public static void main(String[] args) 
            //创建一个独立的EventLoopGroup
            DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2);
            //1、服务端启动器:负责组装netty组件
            new ServerBootstrap()
                    //2、将EventLoop分为boss和worker(即将EventLoop分工细化)
                    // boss即第1个参数,只负责accept事件; worker即第2个参数,只负责socketChannel上的读写
                    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                    //3、选择服务器的 ServerSocketChannel 实现
                    .channel(NioServerSocketChannel.class)
                    //4、添加服务端处理器
                    .childHandler(
                        // 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler
                        new ChannelInitializer<NioSocketChannel>() 
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception 
                            //6、添加具体 handler
                            ch.pipeline().addLast(normalWorkers,"handler1", new ChannelInboundHandlerAdapter() 
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
                                    //msg转ByteBuf
                                    ByteBuf buf = (ByteBuf) msg;
                                    //ByteBuf转字符串
                                    log.debug(buf.toString(Charset.defaultCharset()));
                                    //让消息传递给下一个handler
                                    ctx.fireChannelRead(msg);
                                
                            );
                        
                    )
                    //7、绑定监听端口
                    .bind(8080);
        
    
    

2.2、客户端代码示例

  • 客户端

    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBufAllocator;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    import lombok.extern.slf4j.Slf4j;
    import java.net.InetSocketAddress;
    /**
     * @description: EventLoop处理io任务 客户端
     * @author: xz
     */
    @Slf4j
    public class EventLoopClient 
        public static void main(String[] args) throws InterruptedException 
            // 1. 客户端启动器
            Channel channel = new Bootstrap()
                    // 2. 添加 EventLoop(事件循环)
                    .group(new NioEventLoopGroup(1))
                    // 3. 选择客户端的 SocketChannel 实现
                    .channel(NioSocketChannel.class)
                    // 4. 添加客户端处理器
                    .handler(new ChannelInitializer<NioSocketChannel>() 
                        // 在连接建立后被调用
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception 
                            //9. 消息会经过通道 handler 处理,这里是将 String => ByteBuf 发出
                            nioSocketChannel.pipeline().addLast(new StringEncoder());
                        
                    )
                    //5. 连接到服务器
                    .connect(new InetSocketAddress("localhost", 8080))
                    //6. 等待 connect 建立连接完毕
                    .sync()
                    //7. 连接对象
                    .channel();
            System.out.println("打印channel对象==="+channel);
            //8. 发送数据
            channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("aaaaaa".getBytes()));
        
    
    

2.3、服务端和客户端查看控制台输出结果

  • 先启动服务端,再启动客户端,查看客户端控制台输出,结果如下:
  • 再查看服务端控制台输出,结果如下:

三、ChannelFuture连接问题代码示例

3.1、服务端代码示例

  • 同步2.1步骤中的代码

3.2、客户端代码示例

  • 将2.2步骤中客户端代码拆开,代码如下

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    
    import java.net.InetSocketAddress;
    
    /**
     * @description: EventLoop处理io任务中ChannelFuture连接问题
     * @author: xz
     */
    @Slf4j
    public class ChannelFutureClient 
        public static void main(String[] args) throws InterruptedException 
            client1();
        
         /**
         * 将客户端代码拆开
         * ChannelFuture连接问题 : connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能【立刻】获得到正确的 Channel 对象
         * */
        public static void client1() throws InterruptedException 
            ChannelFuture channelFuture = new Bootstrap()
                    .group(new NioEventLoopGroup(1))
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() 
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception 
                            nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
                            nioSocketChannel.pipeline().addLast(new StringEncoder());
                        
                    )
                    //1、连接到服务器
                    //异步非阻塞,main方法发起了调用,真正执行connect是nio线程
                    //返回的是 ChannelFuture 对象,它的作用是利用 channel() 方法来获取 Channel 对象
                    .connect(new InetSocketAddress("localhost", 8080));
            //无阻塞向下执行获取channel
            Channel channel = channelFuture.channel();
            log.info("连接未建立,channel对象=====",channel);
            channel.writeAndFlush("aaaaaaaaaaaaaaaaaa");
        
     
    

3.3、服务端和客户端查看控制台输出结果

  • 先启动服务端,再启动客户端,查看客户端控制台输出,结果如下:

  • 再查看服务端控制台输出,结果如下:

3.4、ChannelFuture出现连接问题的原因

  • 由上述代码示例可知,connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能【立刻】获得到正确的 Channel 对象。

四、ChannelFuture连接问题的处理方式一(使用sync方法同步处理结果)

4.1、服务端代码示例

  • 同步2.1步骤中的代码

4.2、客户端代码示例

  • 将3.2步骤中客户端代码进行修改,代码如下

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    
    import java.net.InetSocketAddress;
    
    /**
     * @description: EventLoop处理io任务中ChannelFuture连接问题及处理结果
     * @author: xz
     */
    @Slf4j
    public class ChannelFutureClient 
        public static void main(String[] args) throws InterruptedException 
            client2();
        
        public static void client2() throws InterruptedException 
            ChannelFuture channelFuture = new Bootstrap()
                    .group(new NioEventLoopGroup(1))
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() 
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception 
                            nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
                            nioSocketChannel.pipeline().addLast(new StringEncoder());
                        
                    )
                    //1、连接到服务器
                    //异步非阻塞,main方法发起了调用,真正执行connect是nio线程
                    .connect(new InetSocketAddress("localhost", 8080));
            //无阻塞向下执行获取channel
            Channel channel = channelFuture.channel();
            log.info("连接未建立,打印channel对象=====",channel);//2
            //使用sync方法同步处理结果,阻塞当前线程,直到nio线程连接建立完毕
            channelFuture.sync(); // 3
            log.info("建立连接后,打印channel对象=====",channel); // 4
            channel.writeAndFlush("bbbbbbbbb");
        
    
    

4.3、服务端和客户端查看控制台输出结果

  • 先启动服务端,再启动客户端,查看客户端控制台输出,结果如下:

  • 再查看服务端控制台输出,结果如下:

4.4、客户端代码示例标注位置解释

  • 执行到 2 位置时,连接未建立,打印[id: 0x4de78375]
  • 执行到 3 位置时,sync 方法是同步等待连接建立完成
  • 执行到 4 位置时,连接肯定建立了,打印[id: 0x4de78375, L:/127.0.0.1:53147 - R:localhost/127.0.0.1:8080]

五、ChannelFuture连接问题的处理方式二(使用addListener方法异步处理结果)

5.1、服务端代码示例

  • 同步2.1步骤中的代码

5.2、客户端代码示例

  • 将3.2步骤中客户端代码进行修改,代码如下

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    
    import java.net.InetSocketAddress;
    
    /**
     * @description: EventLoop处理io任务中ChannelFuture连接问题及处理结果
     * @author: xz
     */
    @Slf4j
    public class ChannelFutureClient 
        public static void main(String[] args) throws InterruptedException 
            client3();
        
        public static void client3() throws InterruptedException 
            ChannelFuture channelFuture = new Bootstrap()
                    .group(new NioEventLoopGroup(1))
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() 
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception 
                            nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
                            nioSocketChannel.pipeline().addLast(new StringEncoder());
                        
                    )
                    //1、连接到服务器
                    //异步非阻塞,main方法发起了调用,真正执行connect是nio线程
                    .connect(new InetSocketAddress("localhost", 8080));
            //无阻塞向下执行获取channel
            Channel channel = channelFuture.channel();
            log.info("连接未建立,打印channel对象=====",channel);  // 2
            channelFuture.addListener((ChannelFutureListener) future-> 
                log.info("建立连接后,打印channel对象=====",future.channel());//3
                channel.writeAndFlush("ccccccccccc");
            );
        
    
    

5.3、服务端和客户端查看控制台输出结果

  • 先启动服务端,再启动客户端,查看客户端控制台输出,结果如下:

  • 再查看服务端控制台输出,结果如下:

5.4、客户端代码示例标注位置解释

  • 执行到 2 位置时,连接未建立,打印[id: 0x01485347]
  • ChannelFutureListener 会在连接建立时被调用(其中 operationComplete 方法),因此执行到 3 位置时,连接肯定建立了,打印[id: 0x01485347, L:/127.0.0.1:53380 - R:localhost/127.0.0.1:8080]

以上是关于Netty入门——组件(Channel)一的主要内容,如果未能解决你的问题,请参考以下文章

二.Netty入门到超神系列-Java NIO 三大核心(selector,channel,buffer)

Netty入门学习

Netty入门学习

Netty入门之PC聊天室

Java面试基础知识,java语言编程入门

Java NIO三组件——Selecotr/Channel实现原理解析