Netty源码分析之Pipeline创建

Posted xiaobaituyun

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码分析之Pipeline创建相关的知识,希望对你有一定的参考价值。

  不论是NioserverSocketChannel,还是NioSocketChannel,最终都会调用父类AbstractChannel的构造函数,pipeline也在channel被创建的时候被创建。

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    this.unsafe = this.newUnsafe();
    this.pipeline = new DefaultChannelPipeline(this);
}

  而这里是创建了一个DefaultChannelPipeline。构造函数会保存传入的channel,并且默认创建两个节点head和tail并将它们构成双向链表的结构。

public DefaultChannelPipeline(AbstractChannel channel) {
    if(channel == null) {
        throw new NullPointerException("channel");
    } else {
        this.channel = channel;
        this.tail = new DefaultChannelPipeline.TailContext(this);
        this.head = new DefaultChannelPipeline.HeadContext(this);
        this.head.next = this.tail;
        this.tail.prev = this.head;
    }
}

  而这里的head和tail都是实现了ChannelHeadContext接口,在pipeline中每一个节点都是ChannelHeadContext,ChannelHeadContext实现了AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker,其中AttributeMap代表ChannelHeadContext可以存储自定义的属性,而ChannelInboundInvoker, ChannelOutboundInvoker则是定义一些事件的触发方法。

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
    // 返回对应的Channel对象
    Channel channel();
    // 返回对应的EventLoop对象
    EventExecutor executor();
    // 返回对应的ChannelHandler对象
    ChannelHandler handler();
    // 该Context对应的Handler是否从pipeline中移除
    boolean isRemoved();
    /*******************以下的fire方法都是触发对应事件在pipeline中传播*********************/
    ChannelHandlerContext fireChannelRegistered();

    ChannelHandlerContext fireChannelUnregistered();

    ChannelHandlerContext fireChannelActive();

    ChannelHandlerContext fireChannelInactive();

    ChannelHandlerContext fireExceptionCaught(Throwable cause);

    ChannelHandlerContext fireUserEventTriggered(Object evt);

    ChannelHandlerContext fireChannelRead(Object msg);

    ChannelHandlerContext fireChannelReadComplete();

    ChannelHandlerContext fireChannelWritabilityChanged();

    ChannelHandlerContext read();

    ChannelHandlerContext flush();
    // 返回对应的pipeline
    ChannelPipeline pipeline();
}

  pipeline中的head和tail分别新建是HeadContext和TailContext,仔细看这两个类的源码,发现它们不仅仅是实现ChannelHandlerContext,而且本身还是一个channelHandler,而让人比较意外的是,HeadContext实现的是ChannelOutboundHandler(处理输出字节),TailContext则是ChannelInboundHandler(处理输入字节)。

  从HeadContext的源码可以看出,它的很多操作还是依赖于unsafe的。

static final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler {
    private static final String HEAD_NAME = DefaultChannelPipeline.generateName0(DefaultChannelPipeline.HeadContext.class);
    protected final Channel.Unsafe unsafe;

    HeadContext(DefaultChannelPipeline pipeline) {
        super(pipeline, (EventExecutorGroup)null, HEAD_NAME, false, true);
        this.unsafe = pipeline.channel().unsafe();
    }

    public ChannelHandler handler() {
        return this;
    }

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    }

    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    }

    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        this.unsafe.bind(localAddress, promise);
    }

    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        this.unsafe.connect(remoteAddress, localAddress, promise);
    }

    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        this.unsafe.disconnect(promise);
    }

    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        this.unsafe.close(promise);
    }

    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        this.unsafe.deregister(promise);
    }

    public void read(ChannelHandlerContext ctx) {
        this.unsafe.beginRead();
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        this.unsafe.write(msg, promise);
    }

    public void flush(ChannelHandlerContext ctx) throws Exception {
        this.unsafe.flush();
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

  TailContext主要就是对输入数据的一个收尾工作,它的很多方法都为空。

static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    private static final String TAIL_NAME = DefaultChannelPipeline.generateName0(DefaultChannelPipeline.TailContext.class);

    TailContext(DefaultChannelPipeline pipeline) {
        super(pipeline, (EventExecutorGroup)null, TAIL_NAME, true, false);
    }

    public ChannelHandler handler() {
        return this;
    }

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    }

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    }

    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        DefaultChannelPipeline.logger.warn("An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.", cause);
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            DefaultChannelPipeline.logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
        } finally {
            ReferenceCountUtil.release(msg);
        }

    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }
}

以上是关于Netty源码分析之Pipeline创建的主要内容,如果未能解决你的问题,请参考以下文章

netty源码之业务逻辑处理

netty源码之业务逻辑处理

netty源码之业务逻辑处理

5. Netty源码分析之ChannelPipeline 和 ChannelHanler

Netty源码分析 ----- write过程 源码分析

Pipeline的入站流程详解(netty源码死磕7)