Reactor响应模型与实现方式
Posted 咬定青松
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Reactor响应模型与实现方式相关的知识,希望对你有一定的参考价值。
*本文为「码上观世界」原创内容
今日爆料:杭州有家叫某鸭的上市公司,在猎头圈被称做抠鸭,看名字就知道在候选人心目中的形象了。不仅如此,某鸭的面试时间经常安排在饭点(11点);还不顾大多数互联网公司采用视频面试的大势和防疫的需要,坚持第一轮现场面试;而且面试过程中面试官表情无精打采,时常提问,“嗯哼,你说啥”,甚至猎头对接的人事也是隔天回复,“你昨天说的啥”。更有甚者,有朋友面试流程走完了个把月,新公司也入职大半月,他们人事才反应过来追问候选人还来不来。。。以上信息系多渠道汇总,友情提醒各公司重视面试环节体验,一线人事和面试官的散漫态度反应了领导层的无作为和缺乏必要的重视,不可不察。
Reactor响应模式主要应用在高并发请求场景,该场景中,应用服务通常由多个独立的事件关联的处理器组成,Reactor负责事件的监听和调度事件到注册的事件处理器,从而实现非阻塞的并行处理。引入Reactor能给系统设计带来较大的益处,比如高效率、可伸缩,以及由于职责分离带来的可移植性等。本文详细介绍Reactor设计模式和实现方式。
Introduction
下图是是一个分布式日志服务系统:多个客户端同时向服务器提交日志,服务器接收日志并输出到打印机和控制台,对每个客户端来讲,跟服务器的交互有两个步骤:建立连接和提交日志记录。
实现这种日志服务,很自然的想法是通过多线程:主线程acceptor监听客户端请求,accept的客户连接交给新创建的辅线程进行读写,如下图:
上述过程用代码实现如下:
public void run() {
try {
ServerSocket ss = new ServerSocket(port);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
} catch (IOException ex) {
/* ... */
}
}
static class Handler implements Runnable {...}
//辅线程执行主体
public void run() {
try {
byte[] input = new byte[1024];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) {
/* ... */
}
}
多线程固然提高了系统的处理效率,但也带来了以下问题:
效率:每个连接创建一个线程,过多的线程上下文切换、线程同步、数据移动反而拖慢了系统的整体效率
编程复杂:多线程要求复杂的并发控制
移植:不是每个操作系统都支持线程
适配:当增加或优化服务,不能带来显著的程序修改
Solution
引入事件分离器和事件处理派遣器,同时从事件分离和处理派遣机制中解耦服务的派遣和实现。比如通过Event Handler接口代替具体的事件处理器实现注册到事件派遣器中。事件派遣器使用同步事件分离器等待事件到来,当事件发生,同步事件分离器通知事件派遣去同步调用注册其上的事件处理器,总体结构如下:
上述Reactor结构包括以下几个关键部分:
句柄(Handles):Handles代表操作系统管理资源的标识,资源通常包括网络连接、打开的文件、定时器、同步对象等,在日志服务示例中,用Handles代表建立的Socket endpoints,有了handles,同步事件分离器就可以在此等待事件的到来,示例中的事件有连接事件和读事件。
同步事件分离器(Synchronous Event Demultiplexer):在一组handles上阻塞等待事件到来,在Windows和Unix系统中可以使用select获取可以进行相应操作(read、write etc.)的handles。
事件处理器(Event Handler)代表应用相关的抽象操作接口,用户实现业务逻辑,由事件派遣器调用。Concrete Event Handler是Event Handler接口的具体实现。
事件派遣器(Initiation Dispatcher):定义注册、移除、派遣Event Handlers的接口。最终,Synchronous Event Demultiplexer负责等待新事件的到来,然后通知Dispatcher回调应用相关的事件处理器。通常事件包括连接、数据可读、数据可写、操作超时等。
他们的协作关系描述为:
1. 应用将Concrete Event Handler注册到Initiation Dispatcher,Initiation Dispatcher建立Event Handler和其handle的关联,当跟该Handle关联的事件到来,Initiation Dispatcher通知相应类型的事件处理器Event Handler
2. 所有的Event Handlers注册完毕,Initiation Dispatcher启动事件循环,使用Synchronous Event Demultiplexer在所有关联的handles上等待事件到来
3. 当有事件到来,Synchronous Event Demultiplexer 通知 Initiation Dispatcher
4. Initiation Dispatcher根据handle触发相应的Event Handler,执行应用相关的业务逻辑执行
下面通过两个操作场景来描述上述部分的协作过程:
这是客户端连接日志服务器的场景:
1. Logging Acceptor作为一种Event Handler实现注册到Initiation Dispatcher
2. Initiation Dispatcher通过handle_events方法启动事件循环
3. Initiation Dispatcher通过Synchronous Event Demultiplexer select方法调用,等待连接请求到来
4. 客户端发起连接请求
5. Initiation Dispatcher得到连接事件通知,并同步调用注册的 Logging Acceptor
6. Logging Acceptor调用accept获取新连接
7. Logging Acceptor创建新的logging Handler服务该连接
8. Logging Handler注册自己到Initiation Dispatcher,通知Dispatcher当读事件到来时,调用自己
这是客户端向日志服务器发送日志的场景:
1. 客户端通过send方法写入日志记录
2. Initiation Dispatcher得到可读事件通知,并同步调用注册的 Logging Handler
3. Logging Handler以非阻塞的方式读取客户端数据,重复步骤2-3,直到读取完成
4. Logging Handler处理日志记录并输出
5. Logging Handler完成处理,控制权返回给Initiation Dispatcher
实现
Initiation Dispatcher 使用 Synchronous Event Demultiplexer同步等待事件到来,Synchronous Event Demultiplexer选用操作系统支持的select调用。JDK NIO提供了功能完善的API,因此接下来基于NIO API来描述实现上述功能。实现Initiation Dispatcher并不完全按照示意图中结构,比如下面的代码示例中用ServerSocketChannel和Selector实现了服务端口绑定、ACCEPT连接事件的注册和事件循环、派遣:
BasicReactorServer(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
}
public void run() { // normally in a newThread
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) dispatch((SelectionKey) (it.next()));
selected.clear();
}
} catch (IOException ex) {
/* ... */
}
}
派遣功能实现逻辑为:
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null) r.run();
}
其中SelectionKey代表每个事件。
事件处理器的定义为简单起见,继承Runnable接口,应用相关的逻辑只需要实现run方法即可,事件处理器有两种:连接Acceptor和读写Handler。Acceptor逻辑为通过accept建立新的连接,同时创建新的Handler处理读写请求,示例代码如下:
final class Acceptor implements Runnable {
@Override
public void run() {
try {
SocketChannel socketChannel = serverSocket.accept();
if (socketChannel != null) new Handler(selector, socketChannel);
} catch (IOException ex) {
/* ... */
}
}
}
Handler实现对读写请求处理的逻辑,代码示例如下:
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
// Optionally try first read now
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup(); // 注册OP_READ兴趣之后,让select()方法返回,接受要读取的数据
}
@Override
public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) {
/* ... */
}
}
以上实现了Reactor的核心部分功能,Reactor逻辑结构可以描述为下图的样子:
1)Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发;
2)如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接完成后的后续业务处理;
3)如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响应;
4)Handler 会同步完成 Read→业务处理→Send 的完整业务流程。
该模型实现简单,没有并发同步问题,但是无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈。特别地,当主线程出现异常,整个系统失去相应,存在单点故障。真正投入使用该系统,还有很多问题要解决,比如并发控制、性能、扩展性等,比如示例中Handler在一个实例中同步完成读数据、处理到写数据整个过程,实际上I/O阻塞可以发生在这三个环节的任意阶段,因此有必须将该处理链条拆分为更小的任务,并发处理。另外Initiation Dispatcher 使用了单例模式,但是一些操作系统对单例线程中等待的handles数量有限制,因此有必要用多Dispatcher实例模式。接下来以两种方案来描述优化方式。
优化
第一种方案为单Reactor、Handler用线程池来承载,该方案的基本思路是通过多线程,将对数据的业务处理过程放在线程池中执行,而Reactor只负责accept新请求和读写请求。逻辑结构如下图:
1)Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发;
2)如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接完成后续的各种事件;
3)如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响应;
4)Handler 只负责响应事件,不做具体业务处理,通过 Read 读取数据后,会分发给后面的 Worker 线程池进行业务处理;
5)Worker 线程会分配独立的线程完成真正的业务处理,然后将响应结果发给 Handler 进行处理;
6)Handler 收到响应结果后通过 Send 将响应结果返回给 Client。
总体实现流程框架如下:
public void run() { // normally in a newThread
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) dispatch((SelectionKey) (it.next()));
selected.clear();
}
} catch (IOException ex) {
/* ... */
}
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null) {
System.out.println("run:" + this.toString() + ":" + r.toString());
r.run();
}
}
这是事件循环和派遣主逻辑,通过Runnable run接口方法同步调用read和write过程,Handler 相关逻辑同上,区别是在Hander中引入了线程池:
static class Handler implements Runnable {
static ExecutorService executorService = Executors.newFixedThreadPool(5);
在Handler中处理Read和Write过程,将业务处理过程放在线程池中执行,这三个过程通过State切换:初始时候为READING执行read过程,当读取完成,过程变换到process过程,process在Worker线程池中调度,在process之后注册兴趣事件为WRITE,为执行send逻辑做好准备,最后响应WRITE事件切换到send返回响应。如下代码片段所示:
synchronized void read() throws IOException {
socket.read(input);
System.out.println("Read:" + new String(input.array()));
if (inputIsComplete()) {
state = PROCESSING;
executorService.execute(new Processor());
}
}
synchronized void send() throws IOException {
input.flip();
String response = "echo:" + new String(input.array());
ByteBuffer byteBuffer = ByteBuffer.allocate(response.getBytes().length);
byteBuffer.put(response.getBytes());
byteBuffer.flip();
int cnt=socket.write(byteBuffer);
System.out.println(String.format("Send:%s (%d bytes)",new String(byteBuffer.array()),cnt));
if (outputIsComplete()) sk.cancel();
}
第二种方案为多Reactor、Handler用线程池来承载,该方案的逻辑出发点是尽可能利用多核并发优势,将Reactor分为一个主Reactor(专门负责请求连接的建立)和多个辅Reactor(专门负责连接的读写处理),每个Reactor里有独立的Selector、事件循环和事件处理器,总体结构如下图:
1)Reactor 主线程 MainReactor 对象通过 Select 监控建立连接事件,收到事件后通过 Acceptor 接收,处理建立连接事件;
2)Acceptor 处理建立连接事件后,MainReactor 将连接分配 Reactor 子线程给 SubReactor 进行处理;
3)SubReactor 将连接加入连接队列进行监听,并创建一个 Handler 用于处理各种连接事件;
4)当有新的事件发生时,SubReactor 会调用连接对应的 Handler 进行响应;
5)Handler 通过 Read 读取数据后,会分发给后面的 Worker 线程池进行业务处理;
6)Worker 线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给 Handler 进行处理;
7)Handler 收到响应结果后通过 Send 将响应结果返回给 Client。
代码实现的主体逻辑结构为:
本文为论文解读和代码实现类文章,部分内容收费,文章收费部分主要为优化方案的实现,提供完整的代码示例,包括NIO的经典模式、多线程模式和多Reactor模式。付费文章首发微信公众号:码上观世界。
以上是关于Reactor响应模型与实现方式的主要内容,如果未能解决你的问题,请参考以下文章