NettyChannelPipeline和ChannelHandler

Posted 皓月行空

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了NettyChannelPipeline和ChannelHandler相关的知识,希望对你有一定的参考价值。

环境:netty-all-5.0.0.Alpha1.jar

坚持一下,把源码看完,勤奋一点,不要在懒惰了,你已经落下别人很多了

本文主要介绍Netty的ChannelPipeline 和ChannelHandler


一、ChannelPipeline

ChannelPipeline 是ChannelHandler的容器,负责ChannelHandler的管理和事件拦截调度。ChannelPipeline 是线程安全的,通过使用synchronized 关键字来保证线程安全,但是ChannelHandler却不是线程安全的。

ChannelPipeLine的实现类是:DefaultChannelPipeline,在DefaultChannelPipeline中定义了链表的首尾两个节点

    final DefaultChannelHandlerContext head;
    final DefaultChannelHandlerContext tail;

1、ChannelPipeLine的时间处理


(1)底层的SocketChannel read方法读取ByteBuf,触发ChannelRead事件,I/O线程NioEventLoop调用ChannelPipeline的fireChannelRead(Object msg)方法,将消息传输到ChannelPipeline中;

(2)消息依次被HeadHandler、ChannelHandler1、ChannelHandler2........ChannelHandlerN-1、ChannelHandlerN、TailHandler拦截和处理,在这个过程中,任何ChannelHnadler都可以中断当前流程,结束消息传递

(3)调用ChannelHnadlerContext的write方法发送消息,消息从TailHandler开始,途径ChannelHandlerN....ChannelHandler1、HeadHandler,最终被添加到消息发送缓冲区中等待刷新和发送,此过程也可以中断消息传递。


Netty中的事件氛围inbound 和outbound事件。

inbound事件通常由I/O线程触发,例如TCP链路建立事件、链路关闭事件、读事件、异常通知事件等等,它对应上图的左半部分

触发inbound事件的方法如下:

(a)ChannelHandlerContext.fireChannelRegistered();Channel注册事件

(b)ChannelHandlerContext.fireChannelActive();   TCP链路建立成功,Channel激活事件

(c)ChannelHandlerContext.fireChannelRead(Object)  读事件

(d)ChannelHandlerContext.fireChannelReadComplete();  读操作完成通知事件

(e)ChannelHandlerContext.fireExceptionCaught(Throwable); 异常通知事件

(f)ChannelHandlerContext.fireUserEventTriggered(Object):用户自定义事件

(g)ChannelHandlerContext.fireChannelWritabilityChanged();Channel的可写状态变化通知事件

(h)ChannelHandlerContext.fireChannelInactive():TCP连接关闭,链路不可用通知事件

outbound事件通常是由用户主动发起的网络IO操作,例如用户发起的连接操作,绑定操作和消息发送等操作,对应上图的有半部分。触发outbound事件的方法如下:

(a)ChannelHandlerContext.bind(SocketAddress,ChannelPromise)绑定本地地址事件

(b)ChannelHandlerContext.connect(SocketAddress,SocketAddress,ChannelPromise) 连接服务端事件

(c)ChannelHandlerContext.write(Object,ChannelPromise) 发送事件

(d)ChannelHandlerContext.flush(); 刷新事件

(e)ChannelHandlerContext.read();读事件

(g)ChannelHandlerContext.disconnect(ChannelPromise) 断开连接事件

(h)ChannelHandlerContext.close(ChannelPromise)关闭当前channel事件


2、自定义拦截器

ChannelPipeline 通过ChannelHandler接口来实现事件的拦截和处理,可以通过继承ChannelHandlerAdapter,覆盖对应的方法即可。

	static class TimeServerHandler2 extends ChannelHandlerAdapter
		private  int  counter;
		public void channelRead(ChannelHandlerContext ctx,Object msg)
			String body=(String) msg ;
			System.out.println("the time server receive order : "+body+" ; the counter is : "+ ++counter);
			String currentTime="Query time order".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
			currentTime=currentTime+System.getProperty("line.separator");
			ByteBuf resp=Unpooled.copiedBuffer(currentTime.getBytes());
			ctx.writeAndFlush(resp);
		
		
		
		public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause)
			ctx.close();
		
	


3、构建pipeline

用户无需自建pipeline,使用Bootstrap启动服务端或客户端时,Netty会为每个Channel连接创建一个独立的pipeline,对于使用者而言,只需将拦截器加入到pipeline中即可。(SocketChannel) ch.pipeline().addLat();



二、ChannelHandler

ChannelHandler类似于Servlet的Filter过滤器,负责处理事件,由于用户自定义的ChannelHandler可能只单独处理某一个事件,所以如果实现ChannelHandler所有接口会造成代码冗余,所以一般会继承一个基类ChannelHandlerAdapter,这样,用户可以根据需要实现对应的事件处理方法。


1、ByteToMessageDecoder:将字节数组或者字节缓冲区解码为业务可以使用POJO对象,但是该类没有考虑粘包和半包的场景。


2、MessageToMessageDecoder:Netty的二次解码器,将一个对象二次解码为其他对象,比如现将TCP读取到的字节数组解码成Request对象,然后对消息体中的对象进行二次解码


3、LengthFieldBasedFrameDecoder:指定消息长度的解码器,其中有一下几个参数:

lengthFieldOffset:长度字段的偏移位置

lengthFieldLength;长度字段所占的字节长度

lengthAdjustment:长度字段包含消息头长度的时候需要减去该属性的值才是消息体的长度

initialBytesToStrip:越过长度字段的字节数,表明直接解析消息体的位置

比如我们定义一个Header:

int crcCode=0x1234abcd; //一个int 占4个字节,一个字节8个位,用二进制表示一个int变量需要32位,切换到16进制,就是8位。

int length;  //表明header消息体的长度  占4个字节

byte type;   //变量type  占1一个字节

那么 我们在创建LengthFieldBasedFrameDecoder 的时候,就可以指定

lengthFieldLength =4;和 length 字段对应,一个int变量占4个字节 

lengthFieldOffset=4;,由于在length之前定义了一个int变量crcCode,所以偏移量是4个字节

lengthAdjustment=-4; 如果length=消息体长度+length所占的字节数,那么lengthAdjustment=-4,

initialBytesToStrip=4;如果跳过请求头长度字段直接对请求体进行解码,则需要跳过头部表示长度的4个字节。


4、MessageToByteEncoder :将POJO对象编码成ByteBuf

5、MessageToMessageEncoder 将一个POJO对象编码成另一个对象


6、ChannelHandler

以上是关于NettyChannelPipeline和ChannelHandler的主要内容,如果未能解决你的问题,请参考以下文章

c:=make(chan int) 和 c:=make(chan int,1) 有啥区别?

chan中可以阻止多少写操作

chan与select的使用及多个chan的并行处理

2中国共 chan党的建立

不能将 chan chan int 类型的变量用作 <-chan <-chan int 值

go 锁 速度 chan 和 mutex的比较