Netty源码分析之Pipeline创建
Posted xiaobaituyun
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码分析之Pipeline创建相关的知识,希望对你有一定的参考价值。
不论是NioserverSocketChannel,还是NioSocketChannel,最终都会调用父类AbstractChannel的构造函数,pipeline也在channel被创建的时候被创建。
protected AbstractChannel(Channel parent) { this.parent = parent; this.unsafe = this.newUnsafe(); this.pipeline = new DefaultChannelPipeline(this); }
而这里是创建了一个DefaultChannelPipeline。构造函数会保存传入的channel,并且默认创建两个节点head和tail并将它们构成双向链表的结构。
public DefaultChannelPipeline(AbstractChannel channel) { if(channel == null) { throw new NullPointerException("channel"); } else { this.channel = channel; this.tail = new DefaultChannelPipeline.TailContext(this); this.head = new DefaultChannelPipeline.HeadContext(this); this.head.next = this.tail; this.tail.prev = this.head; } }
而这里的head和tail都是实现了ChannelHeadContext接口,在pipeline中每一个节点都是ChannelHeadContext,ChannelHeadContext实现了AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker,其中AttributeMap代表ChannelHeadContext可以存储自定义的属性,而ChannelInboundInvoker, ChannelOutboundInvoker则是定义一些事件的触发方法。
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker { // 返回对应的Channel对象 Channel channel(); // 返回对应的EventLoop对象 EventExecutor executor(); // 返回对应的ChannelHandler对象 ChannelHandler handler(); // 该Context对应的Handler是否从pipeline中移除 boolean isRemoved(); /*******************以下的fire方法都是触发对应事件在pipeline中传播*********************/ ChannelHandlerContext fireChannelRegistered(); ChannelHandlerContext fireChannelUnregistered(); ChannelHandlerContext fireChannelActive(); ChannelHandlerContext fireChannelInactive(); ChannelHandlerContext fireExceptionCaught(Throwable cause); ChannelHandlerContext fireUserEventTriggered(Object evt); ChannelHandlerContext fireChannelRead(Object msg); ChannelHandlerContext fireChannelReadComplete(); ChannelHandlerContext fireChannelWritabilityChanged(); ChannelHandlerContext read(); ChannelHandlerContext flush(); // 返回对应的pipeline ChannelPipeline pipeline(); }
pipeline中的head和tail分别新建是HeadContext和TailContext,仔细看这两个类的源码,发现它们不仅仅是实现ChannelHandlerContext,而且本身还是一个channelHandler,而让人比较意外的是,HeadContext实现的是ChannelOutboundHandler(处理输出字节),TailContext则是ChannelInboundHandler(处理输入字节)。
从HeadContext的源码可以看出,它的很多操作还是依赖于unsafe的。
static final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler { private static final String HEAD_NAME = DefaultChannelPipeline.generateName0(DefaultChannelPipeline.HeadContext.class); protected final Channel.Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, (EventExecutorGroup)null, HEAD_NAME, false, true); this.unsafe = pipeline.channel().unsafe(); } public ChannelHandler handler() { return this; } public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { this.unsafe.bind(localAddress, promise); } public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { this.unsafe.connect(remoteAddress, localAddress, promise); } public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { this.unsafe.disconnect(promise); } public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { this.unsafe.close(promise); } public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { this.unsafe.deregister(promise); } public void read(ChannelHandlerContext ctx) { this.unsafe.beginRead(); } public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { this.unsafe.write(msg, promise); } public void flush(ChannelHandlerContext ctx) throws Exception { this.unsafe.flush(); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } }
TailContext主要就是对输入数据的一个收尾工作,它的很多方法都为空。
static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { private static final String TAIL_NAME = DefaultChannelPipeline.generateName0(DefaultChannelPipeline.TailContext.class); TailContext(DefaultChannelPipeline pipeline) { super(pipeline, (EventExecutorGroup)null, TAIL_NAME, true, false); } public ChannelHandler handler() { return this; } public void channelRegistered(ChannelHandlerContext ctx) throws Exception { } public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { } public void channelActive(ChannelHandlerContext ctx) throws Exception { } public void channelInactive(ChannelHandlerContext ctx) throws Exception { } public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { } public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { DefaultChannelPipeline.logger.warn("An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.", cause); } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { DefaultChannelPipeline.logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); } } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } }
以上是关于Netty源码分析之Pipeline创建的主要内容,如果未能解决你的问题,请参考以下文章