Netty中的ChannelPipeline
Posted 逅弈逐码
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty中的ChannelPipeline相关的知识,希望对你有一定的参考价值。
数据在网络中从端到端进行传输时,就是一次IO的过程,在这个过程中一般会有一个Channel。所有的IO操作例如bind、connect、read、write都会在这个Channel中进行。在Netty中每当一个Channel被创建时,系统会自动为其创建一个ChannelPipeline,整个IO的过程会在ChannelPipeline中贯穿。
什么是ChannelPipeline
ChannelPipeline是一个在 {@link Channel} 上处理或者拦截入站事件/出站操作的 {@link ChannelHandler}s 的集合 它为用户提供了全部的能力来控制事件的处理,并且保证了在pipeline中 {@link ChannelHandler}s 之间是怎样相互联系的
ChannelPipeline和Channel是紧密相连的,在Channel的生命周期中ChannelPipeline是唯一的。
除此以外,ChannelPipeline中还有Channel赖以处理各种IO事件的ChannelHandler。并且ChannelHandler是有 方向
的,包括 InboundChannelHandler
, OutboundChannelHandler
,他们共同组成了ChannelPipeline中处理IO事件的基础。
ChannelPipeline中的ChannelHandler
ChannelPipeline中ChannelHandler之间的关系如下图所示:
如上图所示,当入站事件到来的时候,Pipeline中的ChannelHandler的处理顺序是 InboundHandler1
--> InboundHandler2
--> InboundOutboundHandler5
;而出站事件的处理顺序与入站是相反的,顺序为: InboundOutboundHandler5
--> OutboundHandler4
--> OutboundHandler3
ChannelPipeline就像一个 管道
一样,数据ByteBuf就像管道中的 水
,而处理数据的ChannelHandler就像管道中各种各样的 过滤器
。可是当一个 过滤器
处理过之后他是如何知道该传给其他哪个 过滤器
的呢?
上面我们已经知道了在ChannelPipeline中ChannelHandler是有方向的,当处理Inbound事件时是会跳过Outbound事件的,同理处理Outbound事件时也一样。但是仅仅根据ChannelHandler的类型是不够的,在ChannelPipeline中还有另外一个非常重要的类: ChannelHandlerContext
。
ChannelPipeline中的ChannelHandlerContext
正如皮管可以将一个个的水龙头连接起来,形成一条长长的水管一样。ChannelHandlerContext所做的就是将散落在ChannelPipeline中的ChannelHandler连接起来,使得每个ChannelHandler都不再孤立无援。
ChannelHandlerContext在ChannelPipeline中的结构可以用下图表示:
首先每个ChannelPipeline在初始化的时候会构造两个特殊的ChannelHandlerContext: HeadContext
, TailContext
,在上图中分别用 head
和 tail
表示。为什么说他们是特殊的两个ChannelHandlerContext呢?看一下他们的类构造:
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
// inbound=false, outbound=true
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
}
final class TailContext extends AbstractChannelHandlerContext
implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
// inbound=true, outbound=false
super(pipeline, null, TAIL_NAME, true, false);
setAddComplete();
}
}
可以看到HeadContext和TailContext都继承了AbstractChannelHandlerContext,HeadContext实现了ChannelOutboundHandler和ChannelInboundHandler,TailContext实现了ChannelInboundHandler。并且他们在调用父类的构造方法时,传入的参数TailContext为: inbound=true,outbound=false
,HeadContext为: inbound=false,outbound=true
,由此可见HeadContext是一个outbound类型的ChannelHandler,TailContext是一个inbound类型的ChannelHandler。
再看ChannelPipeline的默认实现类DefaultChannelPipeline的构造方法:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
由构造函数可见,head指向了tail,tail指向了head,这样就构成了一个双向的链表,如下图所示:
另外需要注意的是,这两个Context是Netty默认添加到Pipeline中的,当我们不往Pipeline中添加任何ChannelHandler的时候,Pipeline仍然能保证一个完整的链表。当我们需要往Pipeline中添加一些ChannelHandler的时候,此时调用的其实是这个方法:
@Override
public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
return addFirst(null, name, handler);
}
最终调用的是以下的方法:
@Override
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
name = filterName(name, handler);
// 将ChannelHandler包装为一个ChannelHandlerContext
newCtx = newContext(group, name, handler);
// 将新的Context插入到链表中
addFirst0(newCtx);
// 以下省略部分代码
}
callHandlerAdded0(newCtx);
return this;
}
private void addFirst0(AbstractChannelHandlerContext newCtx) {
// 获得原本链表中表头的下一个节点
AbstractChannelHandlerContext nextCtx = head.next;
// 将新节点插入到表头后面
newCtx.prev = head;
// 将原来表头的下一个节点插入到新节点的后面
newCtx.next = nextCtx;
// 表头的next指向新节点
head.next = newCtx;
// 原表头的下一个节点的prev指向新节点
nextCtx.prev = newCtx;
}
可以看到调用了 addFirst
方法后,会将ChannelHandler包装成一个ChannelHandlerContext,然后添加到链表中head节点的后面。添加完之后整个ChannelPipeline变成下图这样:
相应的调用 addLast
方法,会把ChannelHandler包装成一个ChannelHandlerContext,然后把它添加到链表中tail节点的前面。
数据在ChannelPipeline中的传递
知道了ChannelPipeline中ChannelHandlerContext和ChannelHandler的关系以及结构之后,我们就很容量理解数据在ChannelPipeline中是如何进行传递的了。 我们以 channelRead
方法为例,在 ChannelInboundHandlerAdapter
中channelRead方法,如下所示:
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter
implements ChannelInboundHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 触发ctx的fireChannelRead方法
ctx.fireChannelRead(msg);
}
}
可以看到实际调用的是ChannelHandlerContext的fireChannelRead方法,我们看一下这个方法:
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
// 获取下一个inboundContext并向后传递msg
invokeChannelRead(findContextInbound(), msg);
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
// 获取pipeline中下一个inbound的ctx
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
// touch一个msg
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 获取ctx的执行器,并执行
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 调用ctx的invokeChannelRead(Object msg)方法
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// 获取ChannelHandler之后,调用handler的channelRead方法
// 至此就把数据从一个ChannelHandler传递到了下一个ChannelHandler
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
简单点说,就是InboundChannelHandlerA想把数据msg传递给他的下一个InboundChannelHandlerB,那么整个流程就如下图所示:
以上是关于Netty中的ChannelPipeline的主要内容,如果未能解决你的问题,请参考以下文章
5. Netty源码分析之ChannelPipeline 和 ChannelHanler