程序员进阶路上的高并发Reactor线程模型
Posted 码年
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了程序员进阶路上的高并发Reactor线程模型相关的知识,希望对你有一定的参考价值。
程序员进阶路上有一道关卡是多线程编程。熟悉线程的创建,线程池,线程同步和协调,说明多线程的基础比较牢固了。再进一步,就是要提升对不同的应用场景设计出适合的线程模型的能力了,这需要从其他的线程模型中学习,比如本文要探讨的I/O Reactor线程模型。
当下服务器端的网络服务为实现高并发,高吞吐量使用的最为流行的模式是事件驱动(event driven)模式,Reactor模式是事件驱动模式的一种。
说起I/O的Reactor模式,最经典的文章是Doug Lea的《Scalable IO in Java》,里面介绍了Reactor模式的三种线程模型:单线程,多线程和主从Reactor模型。文章短短几十页,但是说明了高并发IO的线程模型的基础。但是这篇文章比较侧重于基础,也过于简洁,要想在其基础上实现高并发需要有额外的处理。本文先解释论文中的的多线程模型,然后探讨如何改进,进而实现更贴近实际应用的高性能线程模型。
读懂本文需要对Selector有基本的了解,如果不了解,可以读一下我的另一篇文章:《以最简单易懂的方式介绍I/O模型》
大部分网络服务包括以下处理步骤:
Reactor多线程模型图(来自于Doug Lea的文章)
Reactor - 负责响应IO事件,把事件分发给相应的处理代码。Reactor运行在一个独立的线程中(非Thread Pool中的线程)。具体来说,Reactor主要有两个职责,一个是处理来自客户端的连接事件,处理代码由acceptor实现;另一个是处理读取和发送数据的事件,处理代码由Handler实现。
Acceptor - 用以接受客户端的连接请求,然后创建Handler对连接进行后续的处理(读取,处理,发送数据)。
Handler - 事件处理类,用以实现具体的业务逻辑。图中read,decode,compute,encode和send都是由handler实现的。
Thread Pool - Thread Pool中的thread被称作worker thread。Handler中的decode,compute和encode是用worker thread执行的。值得注意的是Handler中的read和send方法是在Reactor线程而不是worker thread中执行的。这意味着对socket数据的读取发送数据和对数据的处理是在不同的线程中进行的.
Reactor多线程模型的主要问题:
Read和send会影响接受客户端连接的性能。
前面分析过read和send是在Reactor线程中执行的,接受客户端的连接请求也是在Reactor线程中执行。这使得如果有read或者send耗时较长,会影响其他客户端连接的速度。
Read和send性能不够高效
网络服务对于对于来自同一客户端,的read和send是串行的,但是对于不同客户端之间的read和send是可以并行进行的。由于read和send运行在Reactor单线程中,不能充分发挥硬件能力。
线程上下文切换带来额外开销
前面提到的处理客户端请求的步骤依次是read,decode,process,encode,send。由于read和send是在Reactor线程中执行,而decode,process和encode是在worker thread线程中执行,引入了额外的线程切换开销,这种开销在高并发的时候会体现出来。
Doug Lea文章中的”主从Reactor“模式可以解决上述第一个和第二个问题。它把接受客户端连接的“主Reactor”单独运行在一个线程中,“从Reactor”有多个,组成一个Reactor Pool,每个“从Reactor”都运行在一个独立的线程上,具有自己的selector和dispatch loop。但第三个问题还是没有解决,read和send仍然是运行在”从Reactor“线程上,而decode,process和encode运行在worker thread上。
要解决第三个问题,可以采用下面的方式。
Reactor线程专门用于接受客户端连接(通过acceptor);创建多个Event Loop ,组成Event Loop Pool,每个Event Loop都有自己的Selector,并且运行在独立的线程上;
Acceptor对于每一个客户端的连接从EventLoopPool中选择一个Event Loop进行处理,并且保证每个客户端连接在整个生命周期中都是由同一个Event Loop线程来处理,从而使得Handler中的实现-read,decode,process,encode,send-都在同一个线程中执行。
整个线程模型除了高效的性能,还有非常重要的一点是Handler的实现不需要加锁,一方面对性能有帮助,另一方面避免多线程编程的复杂度。
改进后的模型图
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
static final int EVENT_LOOP_POOL_SIZE = 4;
final EventLoop[] eventLoopPool = new EventLoop[EVENT_LOOP_POOL_SIZE];
int currentIndex = 0;
Reactor(int port) throws IOException {
// 创建并设置ServerSocketChannel对象
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
// 创建Selector对象,为ServerSocketChannel对象注册OP_ACCEPT事件
selector = Selector.open();
SelectionKey sk =
serverSocket.register(selector,
SelectionKey.OP_ACCEPT);
// 在SelectionKey对象中保存Acceptor对象,
// 这样Acceptor对象可以传递给OP_ACCEPT事件发生时的处理方法
sk.attach(new Acceptor());
// 创建EventLoop对象,每个EventLoop对象会启动一个线程
initEventLoopPool();
}
void initEventLoopPool() {
try {
for (int i = 0; i < EVENT_LOOP_POOL_SIZE; i++) {
eventLoopPool[i] = new EventLoop();
}
} catch (IOException ex) { /* ... */ }
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select(); // 当没有客户端连接时,此语句会阻塞
// 执行到以下语句意味着有客户端连接了
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
// 取出保存在SelectionKey中的Acceptor对象,处理连接请求
SelectionKey sk = ((SelectionKey) (it.next()));
Acceptor acceptor = (Acceptor)sk.attachment();
acceptor.accept();
}
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
class Acceptor {
public void accept() {
try {
// 每个SocketChannel对象都代表一个和客户端的TCP连接,
// 读取客户端发过来的数据和发送数据给客户端都是通过SocketChannel对象进行
SocketChannel c = serverSocket.accept();
if (c != null)
// SocketChannel对象的数据读写和处理逻辑是由Handler实现的,
// Handler对象的方法是在选中的某一个EventLoop的线程中执行的(为什么?答案在Handler实现中)
new Handler(nextEventLoop().selector, c);
} catch(IOException ex) { /* ... */ }
}
EventLoop nextEventLoop() {
// 循环选择EventLoop(Round Robin方式),使得每个EventLoop负载均衡
return eventLoopPool[(currentIndex++) % EVENT_LOOP_POOL_SIZE];
}
}
}
final class EventLoop implements Runnable {
final Selector selector;
EventLoop() throws IOException {
// 每个EventLoop都有属于自己的Selector对象,
// 并且运行在独立的线程上
selector = Selector.open();
new Thread(this).start();
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey) (it.next()));
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null)
r.run();
}
}
final class Handler implements Runnable {
static final int MAXIN = 5000;
static final int MAXOUT = 10000;
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c)
throws IOException {
socket = c;
c.configureBlocking(false);
// 为什么此Handler会在EventLoop中执行?
// 因为入参Selector对象来自于EventLoop,
// EventLoop的线程负责调用此Selector对象的select方法等待事件的发生,
// 事件发生后调用SelectionKey.attachment上的run方法,
// 由于attachment就是此Handler对象,所以相当于在EventLoop线程中调用此Handler的run方法。
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() { /* ... */ }
boolean outputIsComplete() { /* ... */ }
void process() { /* ... */ }
public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
// inputIsComplete方法是干什么用的?
// 客户端发送过来的数据并不一定是全部放到buffer后再通知selector说数据就绪了,
// 大部分情况是部分数据放到缓存以后就通知selector说数据就绪了。所以此read方法可能会被调用多次。
// 用户需要在客户端和服务端约定好怎么样才算数据发送完了,
// 比如可以用一个特殊的字符结尾,或者先用发送过来的头两个字节代表数据的长度等。
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete()) sk.cancel();
}
}
今天的分享就到这里了,我们下一篇见。
快来关注我吧
--- 喜欢,就“在看”一下呗 ---
以上是关于程序员进阶路上的高并发Reactor线程模型的主要内容,如果未能解决你的问题,请参考以下文章