Netty中的ChannelPipeline

Posted 逅弈逐码

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty中的ChannelPipeline相关的知识,希望对你有一定的参考价值。

数据在网络中从端到端进行传输时,就是一次IO的过程,在这个过程中一般会有一个Channel。所有的IO操作例如bind、connect、read、write都会在这个Channel中进行。在Netty中每当一个Channel被创建时,系统会自动为其创建一个ChannelPipeline,整个IO的过程会在ChannelPipeline中贯穿。

什么是ChannelPipeline

ChannelPipeline是一个在 {@link Channel} 上处理或者拦截入站事件/出站操作的 {@link ChannelHandler}s 的集合 它为用户提供了全部的能力来控制事件的处理,并且保证了在pipeline中 {@link ChannelHandler}s 之间是怎样相互联系的

ChannelPipeline和Channel是紧密相连的,在Channel的生命周期中ChannelPipeline是唯一的。

除此以外,ChannelPipeline中还有Channel赖以处理各种IO事件的ChannelHandler。并且ChannelHandler是有 方向的,包括 InboundChannelHandlerOutboundChannelHandler,他们共同组成了ChannelPipeline中处理IO事件的基础。

ChannelPipeline中的ChannelHandler

ChannelPipeline中ChannelHandler之间的关系如下图所示:

如上图所示,当入站事件到来的时候,Pipeline中的ChannelHandler的处理顺序是 InboundHandler1--> InboundHandler2--> InboundOutboundHandler5;而出站事件的处理顺序与入站是相反的,顺序为: InboundOutboundHandler5--> OutboundHandler4--> OutboundHandler3

ChannelPipeline就像一个 管道一样,数据ByteBuf就像管道中的 ,而处理数据的ChannelHandler就像管道中各种各样的 过滤器。可是当一个 过滤器处理过之后他是如何知道该传给其他哪个 过滤器的呢?

上面我们已经知道了在ChannelPipeline中ChannelHandler是有方向的,当处理Inbound事件时是会跳过Outbound事件的,同理处理Outbound事件时也一样。但是仅仅根据ChannelHandler的类型是不够的,在ChannelPipeline中还有另外一个非常重要的类: ChannelHandlerContext

ChannelPipeline中的ChannelHandlerContext

正如皮管可以将一个个的水龙头连接起来,形成一条长长的水管一样。ChannelHandlerContext所做的就是将散落在ChannelPipeline中的ChannelHandler连接起来,使得每个ChannelHandler都不再孤立无援。

ChannelHandlerContext在ChannelPipeline中的结构可以用下图表示:Netty中的ChannelPipeline

首先每个ChannelPipeline在初始化的时候会构造两个特殊的ChannelHandlerContext: HeadContext, TailContext,在上图中分别用 headtail表示。为什么说他们是特殊的两个ChannelHandlerContext呢?看一下他们的类构造:

 
   
   
 
  1. final class HeadContext extends AbstractChannelHandlerContext

  2.          implements ChannelOutboundHandler, ChannelInboundHandler {

  3.  private final Unsafe unsafe;

  4.  HeadContext(DefaultChannelPipeline pipeline) {

  5.      // inbound=false, outbound=true

  6.      super(pipeline, null, HEAD_NAME, false, true);

  7.      unsafe = pipeline.channel().unsafe();

  8.      setAddComplete();

  9.  }

  10. }

  11. final class TailContext extends AbstractChannelHandlerContext

  12.        implements ChannelInboundHandler {

  13.    TailContext(DefaultChannelPipeline pipeline) {

  14.        // inbound=true, outbound=false

  15.        super(pipeline, null, TAIL_NAME, true, false);

  16.        setAddComplete();

  17.    }

  18. }

可以看到HeadContext和TailContext都继承了AbstractChannelHandlerContext,HeadContext实现了ChannelOutboundHandler和ChannelInboundHandler,TailContext实现了ChannelInboundHandler。并且他们在调用父类的构造方法时,传入的参数TailContext为: inbound=true,outbound=false,HeadContext为: inbound=false,outbound=true,由此可见HeadContext是一个outbound类型的ChannelHandler,TailContext是一个inbound类型的ChannelHandler。

再看ChannelPipeline的默认实现类DefaultChannelPipeline的构造方法:

 
   
   
 
  1. protected DefaultChannelPipeline(Channel channel) {

  2.    this.channel = ObjectUtil.checkNotNull(channel, "channel");

  3.    succeededFuture = new SucceededChannelFuture(channel, null);

  4.    voidPromise =  new VoidChannelPromise(channel, true);

  5.    tail = new TailContext(this);

  6.    head = new HeadContext(this);

  7.    head.next = tail;

  8.    tail.prev = head;

  9. }

由构造函数可见,head指向了tail,tail指向了head,这样就构成了一个双向的链表,如下图所示:

Netty中的ChannelPipeline

另外需要注意的是,这两个Context是Netty默认添加到Pipeline中的,当我们不往Pipeline中添加任何ChannelHandler的时候,Pipeline仍然能保证一个完整的链表。当我们需要往Pipeline中添加一些ChannelHandler的时候,此时调用的其实是这个方法:

 
   
   
 
  1. @Override

  2. public final ChannelPipeline addFirst(String name, ChannelHandler handler) {

  3.    return addFirst(null, name, handler);

  4. }

最终调用的是以下的方法:

 
   
   
 
  1. @Override

  2. public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {

  3.    final AbstractChannelHandlerContext newCtx;

  4.    synchronized (this) {

  5.        checkMultiplicity(handler);

  6.        name = filterName(name, handler);

  7.        // 将ChannelHandler包装为一个ChannelHandlerContext

  8.        newCtx = newContext(group, name, handler);

  9.        // 将新的Context插入到链表中

  10.        addFirst0(newCtx);

  11.        // 以下省略部分代码

  12.    }

  13.    callHandlerAdded0(newCtx);

  14.    return this;

  15. }

  16. private void addFirst0(AbstractChannelHandlerContext newCtx) {

  17.    // 获得原本链表中表头的下一个节点

  18.    AbstractChannelHandlerContext nextCtx = head.next;

  19.    // 将新节点插入到表头后面

  20.    newCtx.prev = head;

  21.    // 将原来表头的下一个节点插入到新节点的后面

  22.    newCtx.next = nextCtx;

  23.    // 表头的next指向新节点

  24.    head.next = newCtx;

  25.    // 原表头的下一个节点的prev指向新节点

  26.    nextCtx.prev = newCtx;

  27. }

可以看到调用了 addFirst方法后,会将ChannelHandler包装成一个ChannelHandlerContext,然后添加到链表中head节点的后面。添加完之后整个ChannelPipeline变成下图这样:

相应的调用 addLast方法,会把ChannelHandler包装成一个ChannelHandlerContext,然后把它添加到链表中tail节点的前面。

数据在ChannelPipeline中的传递

知道了ChannelPipeline中ChannelHandlerContext和ChannelHandler的关系以及结构之后,我们就很容量理解数据在ChannelPipeline中是如何进行传递的了。 我们以 channelRead方法为例,在 ChannelInboundHandlerAdapter中channelRead方法,如下所示:

 
   
   
 
  1. public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter

  2.      implements ChannelInboundHandler {

  3.  @Override

  4.  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

  5.      // 触发ctx的fireChannelRead方法

  6.      ctx.fireChannelRead(msg);

  7.  }

  8. }

可以看到实际调用的是ChannelHandlerContext的fireChannelRead方法,我们看一下这个方法:

 
   
   
 
  1. @Override

  2. public ChannelHandlerContext fireChannelRead(final Object msg) {

  3.    // 获取下一个inboundContext并向后传递msg

  4.    invokeChannelRead(findContextInbound(), msg);

  5.    return this;

  6. }

  7. private AbstractChannelHandlerContext findContextInbound() {

  8.    AbstractChannelHandlerContext ctx = this;

  9.    do {

  10.        // 获取pipeline中下一个inbound的ctx

  11.        ctx = ctx.next;

  12.    } while (!ctx.inbound);

  13.    return ctx;

  14. }

  15. static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {

  16.    // touch一个msg

  17.    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);

  18.    // 获取ctx的执行器,并执行

  19.    EventExecutor executor = next.executor();

  20.    if (executor.inEventLoop()) {

  21.        // 调用ctx的invokeChannelRead(Object msg)方法

  22.        next.invokeChannelRead(m);

  23.    } else {

  24.        executor.execute(new Runnable() {

  25.            @Override

  26.            public void run() {

  27.                next.invokeChannelRead(m);

  28.            }

  29.        });

  30.    }

  31. }

  32. private void invokeChannelRead(Object msg) {

  33.    if (invokeHandler()) {

  34.        try {

  35.            // 获取ChannelHandler之后,调用handler的channelRead方法

  36.            // 至此就把数据从一个ChannelHandler传递到了下一个ChannelHandler

  37.            ((ChannelInboundHandler) handler()).channelRead(this, msg);

  38.        } catch (Throwable t) {

  39.            notifyHandlerException(t);

  40.        }

  41.    } else {

  42.        fireChannelRead(msg);

  43.    }

  44. }

简单点说,就是InboundChannelHandlerA想把数据msg传递给他的下一个InboundChannelHandlerB,那么整个流程就如下图所示:


以上是关于Netty中的ChannelPipeline的主要内容,如果未能解决你的问题,请参考以下文章

Netty源码之——ChannelPipeline

netty之channelPipeline

netty之channelPipeline

5. Netty源码分析之ChannelPipeline 和 ChannelHanler

Netty源码_ChannelPipeline和ChannelHandlerContext详解

netty的ChannelPipeline执行顺序对inBound和outBound执行器造成的影响