Netty入门——Handler & Pipeline
Posted 小志的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty入门——Handler & Pipeline相关的知识,希望对你有一定的参考价值。
目录
一、Handler & Pipeline 的概述
1.1、ChannelHandler
- ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。
- 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果。
- 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工。
1.2、Pipeline 的概述
- 所有 ChannelHandler 被连成一串,就是 Pipeline。
二、入站处理器代码示例
2.1、服务端代码
-
服务端代码
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioserverSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; /** * @description: * @author: xz */ @Slf4j public class PipelineServer public static void main(String[] args) getChannelInboundHandlerAdapter(); /** * 入站处理器示例 * */ public static void getChannelInboundHandlerAdapter() new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() @Override protected void initChannel(NioSocketChannel ch) throws Exception // 1. 通过 channel 拿到 pipeline ChannelPipeline pipeline = ch.pipeline(); // 2. 添加入站处理器 head -> h1 -> tail pipeline.addLast("h1", new ChannelInboundHandlerAdapter() @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception log.debug("1"); // 将数据传递给下个 handler,如果不调用,调用链会断开 或者调用 ctx.fireChannelRead(student); super.channelRead(ctx, msg);//a ); // 2. 添加入站处理器 head -> h1 -> h2-> tail pipeline.addLast("h2", new ChannelInboundHandlerAdapter() @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception log.debug("2"); // 将数据传递给下个 handler,如果不调用,调用链会断开 或者调用 ctx.fireChannelRead(student); super.channelRead(ctx, msg);//b ); // 2. 添加入站处理器 head -> h1 -> h2 -> h3 -> tail pipeline.addLast("h3", new ChannelInboundHandlerAdapter() @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception log.debug("3"); ); ) .bind(8080);
2.2、客户端代码
-
客户端代码
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import java.net.InetSocketAddress; /** * @description: * @author: xz */ public class PipelineClient public static void main(String[] args) throws InterruptedException client1(); /** * 客户端代码 * */ public static void client1() throws InterruptedException new Bootstrap() .group(new NioEventLoopGroup(1)) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO)); nioSocketChannel.pipeline().addLast(new StringEncoder()); ) .connect(new InetSocketAddress("localhost", 8080)) .sync() .channel() .writeAndFlush("hello world");
2.3、服务端输出结果
- 先启动服务端,再启动客户端,查看服务端输出结果
2.4、入站处理器服务端代码示例标注位置解释
- 入站处理器中, super.channelRead(ctx, msg) 是 调用下一个入站处理器;
- 如果注释掉 a 位置处代码,则仅会打印 1;
- 如果注释掉 b 位置处代码,则仅会打印 1 2;
三、出站处理器代码示例
3.1、服务端代码
-
服务端代码
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; /** * @description: * @author: xz */ @Slf4j public class PipelineServer public static void main(String[] args) getChannelOutboundHandlerAdapter(); /** * 出站处理器示例 * */ public static void getChannelOutboundHandlerAdapter() new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() @Override protected void initChannel(NioSocketChannel ch) throws Exception // 1. 通过 channel 拿到 pipeline ChannelPipeline pipeline = ch.pipeline(); // 2. 添加处理器 head -> h1 -> tail pipeline.addLast("h1", new ChannelInboundHandlerAdapter() @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception log.debug("1"); // 将数据传递给下个 handler,如果不调用,调用链会断开 或者调用 ctx.fireChannelRead(msg); super.channelRead(ctx, msg);//a ); // 3. 添加处理器 head -> h1 -> h2 -> tail pipeline.addLast("h2", new ChannelInboundHandlerAdapter() @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception log.debug("2"); // 将数据传递给下个 handler,如果不调用,调用链会断开 或者调用 ctx.fireChannelRead(msg); super.channelRead(ctx, msg);//b ); // 4. 添加处理器 head -> h1 -> h2 -> -> h3 -> tail pipeline.addLast("h3", new ChannelInboundHandlerAdapter() @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception log.debug("3"); /** * 只有向channel中写入数据才会触发出站处理器 * (即只有执行了writeAndFlush方法,才会触发ChannelOutboundHandlerAdapter) */ ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));//c ); // 5. 添加处理器 head -> h1 -> h2 -> h3 -> h4 -> tail pipeline.addLast("h4", new ChannelOutboundHandlerAdapter() @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception log.debug("4"); super.write(ctx, msg, promise);//d ); // 6. 添加处理器 head -> h1 -> h2 -> h3 -> h4 -> h5 -> tail pipeline.addLast("h5", new ChannelOutboundHandlerAdapter() @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception log.debug("5"); super.write(ctx, msg, promise);//e ); // 7. 添加处理器 head -> h1 -> h2 -> h3 -> h4 -> h5 -> h6 -> tail pipeline.addLast("h6", new ChannelOutboundHandlerAdapter() @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception log.debug("6"); super.write(ctx, msg, promise);//f ); ) .bind(8080);
3.2、客户端代码
-
客户端代码
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import java.net.InetSocketAddress; /** * @description: * @author: xz */ public class PipelineClient public static void main(String[] args) throws InterruptedException client1(); /** * 客户端代码 * */ public static void client1() throws InterruptedException new Bootstrap() .group(new NioEventLoopGroup(1)) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO)); nioSocketChannel.pipeline().addLast(new StringEncoder()); ) .connect(new InetSocketAddress("localhost", 8080)) .sync() .channel() .writeAndFlush("hello world");
3.3、服务端输出结果
- 先启动服务端,再启动客户端,查看服务端输出结果
- 由输出结果可知,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表。
3.4、出站处理器服务端代码示例标注位置解释
- 出站处理器中, super.channelRead(ctx, msg) 是 调用下一个入站处理器;
(1)、如果注释掉代码 a 位置,则仅会打印 1;
(2)、如果注释掉代码 b 位置,则仅会打印 1 2; - 类似的,出站处理器中,super.write(ctx, msg, promise) 的调用也会 触发上一个出站处理器;
(1)、如果注释掉 代码 f 位置,则仅会打印 1 2 3 6 - 代码 c 位置的 ch.writeAndFlush(ctx.alloc().buffer().writeBytes(“server…”.getBytes())); 会 从尾部开始触发 后续出站处理器的执行;
(1)、如果注释掉 代码 c 位置,则仅会打印 1 2 3
以上是关于Netty入门——Handler & Pipeline的主要内容,如果未能解决你的问题,请参考以下文章
Day475.GoogleProtobuf&Netty编解码器和handler 的调用机制 -netty