Netty框架之责任链模式及其应用
Posted 踩踩踩从踩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty框架之责任链模式及其应用相关的知识,希望对你有一定的参考价值。
前言
在上篇博客介绍完netty框架的基本组件介绍和概述,也跟着代码看了下NioEventLoopGroup的启动过程,以及基于Reactor线程模型的解析,它是开发Netty的核心思想,也是整个Netty框架的核心思想;这篇文章分析Netty中责任链模式,该模式给netty框架提供了大量一些扩展,使得netty框架更适合在业务场景上使用。
设计模式 - 责任链模式
Netty中的责任链模式也就是来自设计模式中责任链模式。
类似于上面图的情况,每个部分做的事情不同,并不感染,感觉就像是一个个组件进行组装
责任链的好处
- 降低了对象之间的耦合度。该模式使得一个对象无须知道到底是哪一个对象处理其请求以及链的结构,发送者和接收者也无须拥有对方的明确信息。
- 增强了系统的可扩展性。可以根据需要增加新的请求处理类,满足开闭原则。
- 增强了给对象指派职责的灵活性。当工作流程发生变化,可以动态地改变链内的成员或者调动它们的次序,也可动态地新增或者删除责任。
- 责任链简化了对象之间的连接。每个对象只需保持一个指向其后继者的引用,不需保持其他所有处理者的引用,这避免了使用众多的 if 或者 if···else 语句。
- 责任分担。每个类只需要处理自己该处理的工作,不该处理的传递给下一个对象完成,明确各类的责任范围,符合类的单一职责原则。
缺点
- 不能保证每个请求都被处理,有可能这个处理并不会到末端就抛出掉了某个请求
- 对比较长的职责链,对象是非常多的,有可能会出现占用资源过大的问题
- 职责链建立的合理性要靠客户端来保证,因此也可能出现写出错的情况
在netty中主要就是利用责任链模式,进行业务上处理,达到netty的耦合降低,扩展性好的特点。
实现责任链模式
一定有四个要素:
- 处理抽象类 抽象出方法用于链条的传递 next 方法
- 具体的处理器实现类 用于处理链条之间的数据
- 保存处理器信息 无论是数组实现 还是链条实现 都需要一个容器来存储这些实现类然后让链条连接起来
- 处理执行 开始执行的方法 执行器
代码实现
- 抽象处理类以及 处理抽象类 Pipeline 来进行管理并持有 抽象处理类
/**
* 主要持有类
*
* @author
*
*/
public class Pipeline {
/**
* handler上下文,主要负责维护链 ,和链的执行
*/
class HandlerChainContext {
HandlerChainContext next;// 持有下一个节点
AbstractHandler handler;
public HandlerChainContext(AbstractHandler handler) {
this.handler = handler;
}
// 将节点持有下去
void handler(Object arg0) {
this.handler.doHandler(this, arg0);
}
/**
* 继续执行下一个
*/
void runNext(Object arg0) {
if (this.next != null) {
this.next.handler(arg0);
}
}
}
/**
* 抽象处理类 用于处理链条之间的数据
*
* @author
*
*/
static abstract class AbstractHandler {
abstract void doHandler(HandlerChainContext context, Object arg0);
}
}
- 对Pipeline 进行完善
// 持有上下文
public HandlerChainContext context = new HandlerChainContext(new AbstractHandler() {
@Override
void doHandler(HandlerChainContext context, Object arg0) {
System.out.println("这是头部");
context.runNext(arg0);
}
});
- 增加责任链的方法及调用方法
// 添加责任链
public void addLast(AbstractHandler handler) {
HandlerChainContext next = context;
while (next.next != null) {
next = next.next;
}
next.next = new HandlerChainContext(handler);
}
// 开始调用
public void requestProcess(Object arg0) {
context.handler(arg0);
}
- 最后创建一个具体实现处理类 和测试代码
static class Handler1 extends AbstractHandler {
@Override
void doHandler(HandlerChainContext handlerChainContext, Object arg0) {
arg0 = arg0.toString() + "..handler1的的处理.....";
System.out.println("我是Handler1的实例,我在处理:" + arg0);
// 继续执行下一个
handlerChainContext.runNext(arg0);
}
}
public static void main(String[] args) {
Pipeline p = new Pipeline();
p.addLast(new Handler1());
p.requestProcess("开始了....");
}
得到下面的打印结果
这是头部
我是Handler1的实例,我在处理:开始了........这是头部......handler1的的处理.....
完整代码
/**
* 主要持有类
*
* @author
*
*/
public class Pipeline {
// 持有上下文
public HandlerChainContext context = new HandlerChainContext(new AbstractHandler() {
@Override
void doHandler(HandlerChainContext context, Object arg0) {
System.out.println("这是头部");
context.runNext(arg0 + "....这是头部....");
}
});
// 添加责任链
public void addLast(AbstractHandler handler) {
HandlerChainContext next = context;
while (next.next != null) {
next = next.next;
}
next.next = new HandlerChainContext(handler);
}
// 开始调用
public void requestProcess(Object arg0) {
context.handler(arg0);
}
/**
* handler上下文,主要负责维护链 ,和链的执行
*/
class HandlerChainContext {
HandlerChainContext next;// 持有下一个节点
AbstractHandler handler;
public HandlerChainContext(AbstractHandler handler) {
this.handler = handler;
}
// 将节点持有下去
void handler(Object arg0) {
this.handler.doHandler(this, arg0);
}
/**
* 继续执行下一个
*/
void runNext(Object arg0) {
if (this.next != null) {
this.next.handler(arg0);
}
}
}
/**
* 具体的处理器实现类 用于处理链条之间的数据
*
* @author
*
*/
static abstract class AbstractHandler {
abstract void doHandler(HandlerChainContext context, Object arg0);
}
static class Handler1 extends AbstractHandler {
@Override
void doHandler(HandlerChainContext handlerChainContext, Object arg0) {
arg0 = arg0.toString() + "..handler1的的处理.....";
System.out.println("我是Handler1的实例,我在处理:" + arg0);
// 继续执行下一个
handlerChainContext.runNext(arg0);
}
}
public static void main(String[] args) {
Pipeline p = new Pipeline();
p.addLast(new Handler1());
p.requestProcess("开始了....");
}
}
Netty中的ChannelPipeline责任链
在netty中实现责任链的就是Pipeline管道
- NioEventLoop的run是在初始化时就会启动run方法,然后监听是否有事件过来
- 走到 processSelectedKey 方法中的 事件读取 niomessageunsafe中
- doReadMessages 方法中 创建 一个NioSocketChannel
- 在抽象类AbstractChannel中持有者pipeline
- 会在初始化时就创建 这个pipeline
- Pipeline管道中入站事件 ,以及出站事件
- handler处理器是什么,有什么作用?
-
Pipeline 如何维护 handler 的。
-
handler 的执行。
Pipeline
入站事件:
通常指I/O线程生成了入站数据。(通俗理解: 从socket底层自己往上冒上来的事件都是入站)比如EventLoop收到selector的OP_READ事件,入站处理器调用socketChannel.read(ByteBuffer) 接收到数据后,这将导致通道的ChannelPipeline中包含的下一个中的channelRead方法被调用。
native层往应用写数据 包括各个事件就是入站事件。
应用层往native层写数据 连接关闭 写入数据等等都是出站事件
- 在DefaultChannelPipeline中就有头和尾 的Context 类似责任链中 持有的 具体的处理器实现类
- 初始化时 将连接起来 头和尾连接起来
- 当消息过来时就会走AbstractNioByteChannel 中read 方法做包括申请多少内存 以及 pipeline.fireChannelRead(byteBuf); 去读取数据内容 这里的ByteBuf 是封装过了ByteBuffer的一个类。
- 就是fire 开头的都是触发入站的事件
- 而对于出站事件则是 包括wite read bind等方法来表示的
- 源码中对于入站和出战事件,完全按照 入站中头开始,出战按尾部开始
- 继续走下去找到读取的方法
- 调用到下一个channelpipline方法
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
- 在pipeline 中 add 和remove则是维护链表上的数据
- 对于context中也存在许多的读取方法等等
AbstractChannelHandlerContext
作为有 pipeline 以及context 事件的持有者,并在实现的子类中持有对应的handler 利用handler方法,而context就是为了维护链的结构而产生的
所以要对事件进行做处理,因此需要实现 ChannelInboundHandlerAdapter 或者 对应的事件即可
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("收到客户端数据,还给客户端:" + msg);
ctx.write(msg);
// ctx.channel().write("");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
在使用时,添加进去就行
ServerBootstrap b = new ServerBootstrap();
// 3、配置启动器
b.group(bossGroup, workerGroup)
.channel(NioserverSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
ChannelDuplexHandler
这个Handler 是继承ChannelInboundHandlerAdapter 并且实现了ChannelOutboundHandler的既能入站 也可以出站
在源码中有很多这种handler 实现 提供给我们可以我们对入站事件和出站事件进行处理。
Netty中事件的定义
Pipeline中的handler
Pipeline中的handler
ChannelPipeline是线程安全的,ChannelHandler可以在任何时候添加或删除。
Handler的执行分析
结构架构图
构成一个channel
构成整体的图
总结
以上是关于Netty框架之责任链模式及其应用的主要内容,如果未能解决你的问题,请参考以下文章