程序员进阶路上的高并发Reactor线程模型

Posted 码年

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了程序员进阶路上的高并发Reactor线程模型相关的知识,希望对你有一定的参考价值。


程序员进阶路上有一道关卡是多线程编程。熟悉线程的创建,线程池,线程同步和协调,说明多线程的基础比较牢固了。再进一步,就是要提升对不同的应用场景设计出适合的线程模型的能力了,这需要从其他的线程模型中学习,比如本文要探讨的I/O Reactor线程模型。


当下服务器端的网络服务为实现高并发,高吞吐量使用的最为流行的模式是事件驱动(event driven)模式,Reactor模式是事件驱动模式的一种。


说起I/OReactor模式,最经典的文章是Doug Lea的《Scalable IO in Java》,里面介绍了Reactor模式的三种线程模型:单线程,多线程和主从Reactor模型。文章短短几十页,但是说明了高并发IO的线程模型的基础。但是这篇文章比较侧重于基础,也过于简洁,要想在其基础上实现高并发需要有额外的处理。本文先解释论文中的的多线程模型,然后探讨如何改进,进而实现更贴近实际应用的高性能线程模型。


读懂本文需要对Selector有基本的了解,如果不了解,可以读一下我的另一篇文章:《以最简单易懂的方式介绍I/O模型》

Reactor多线程模型

大部分网络服务包括以下处理步骤:

read request (读取客户端发送过来的byte 数据)
decode request (把byte 数据解码成特定类型的数据)
process (compute) service (根据请求数据进行业务处理)
encode reply (把处理结果转换成byte 数据)
send reply (发送byte 数据给客户端)


Reactor多线程模型图(来自于Doug Lea的文章)

程序员进阶路上的高并发Reactor线程模型

Reactor - 负责响应IO事件,把事件分发给相应的处理代码。Reactor运行在一个独立的线程中(非Thread Pool中的线程)。具体来说,Reactor主要有两个职责,一个是处理来自客户端的连接事件,处理代码由acceptor实现;另一个是处理读取和发送数据的事件,处理代码由Handler实现。

Acceptor - 用以接受客户端的连接请求,然后创建Handler对连接进行后续的处理(读取,处理,发送数据)。

Handler - 事件处理类,用以实现具体的业务逻辑。图中readdecodecomputeencodesend都是由handler实现的。

Thread Pool - Thread Pool中的thread被称作worker threadHandler中的decodecomputeencode是用worker thread执行的。值得注意的是Handler中的readsend方法是在Reactor线程而不是worker thread中执行的。这意味着对socket数据的读取发送数据和对数据的处理是在不同的线程中进行的.

程序员进阶路上的高并发Reactor线程模型

Reactor多线程模型的主要问题:

  • Read和send会影响接受客户端连接的性能。

前面分析过read和send是在Reactor线程中执行的,接受客户端的连接请求也是在Reactor线程中执行。这使得如果有read或者send耗时较长,会影响其他客户端连接的速度。

  • Read和send性能不够高效

网络服务对于对于来自同一客户端,的read和send是串行的,但是对于不同客户端之间的read和send是可以并行进行的。由于read和send运行在Reactor单线程中,不能充分发挥硬件能力。

  • 线程上下文切换带来额外开销

前面提到的处理客户端请求的步骤依次是read,decode,process,encode,send。由于read和send是在Reactor线程中执行,而decode,process和encode是在worker thread线程中执行,引入了额外的线程切换开销,这种开销在高并发的时候会体现出来。

实际应用中的多线程模型


Doug Lea文章中的”主从Reactor“模式可以解决上述第一个和第二个问题。它把接受客户端连接的“主Reactor”单独运行在一个线程中,“从Reactor”有多个,组成一个Reactor Pool,每个“从Reactor”都运行在一个独立的线程上,具有自己的selector和dispatch loop。但第三个问题还是没有解决,read和send仍然是运行在”从Reactor“线程上,而decode,process和encode运行在worker thread上。

要解决第三个问题,可以采用下面的方式。

Reactor线程专门用于接受客户端连接(通过acceptor);创建多个Event Loop ,组成Event Loop Pool,每个Event Loop都有自己的Selector,并且运行在独立的线程上;

Acceptor对于每一个客户端的连接从EventLoopPool中选择一个Event Loop进行处理,并且保证每个客户端连接在整个生命周期中都是由同一个Event Loop线程来处理,从而使得Handler中的实现-read,decode,process,encode,send-都在同一个线程中执行。

整个线程模型除了高效的性能,还有非常重要的一点是Handler的实现不需要加锁,一方面对性能有帮助,另一方面避免多线程编程的复杂度。

改进后的模型图

程序员进阶路上的高并发Reactor线程模型

代码示例及注解


程序员进阶路上的高并发Reactor线程模型


class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;
    static final int EVENT_LOOP_POOL_SIZE = 4;
    final EventLoop[] eventLoopPool = new EventLoop[EVENT_LOOP_POOL_SIZE];
    int currentIndex = 0;

    Reactor(int port) throws IOException {
        // 创建并设置ServerSocketChannel对象
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        // 创建Selector对象,为ServerSocketChannel对象注册OP_ACCEPT事件
        selector = Selector.open();
        SelectionKey sk =
                serverSocket.register(selector,
                        SelectionKey.OP_ACCEPT);
        // 在SelectionKey对象中保存Acceptor对象,
        // 这样Acceptor对象可以传递给OP_ACCEPT事件发生时的处理方法
        sk.attach(new Acceptor());
        // 创建EventLoop对象,每个EventLoop对象会启动一个线程
        initEventLoopPool();
    }

    void initEventLoopPool() {
        try {
            for (int i = 0; i < EVENT_LOOP_POOL_SIZE; i++) {
                eventLoopPool[i] = new EventLoop();
            }
        } catch (IOException ex) { /* ... */ }
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select(); // 当没有客户端连接时,此语句会阻塞
                // 执行到以下语句意味着有客户端连接了
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext()) {
                    // 取出保存在SelectionKey中的Acceptor对象,处理连接请求
                    SelectionKey sk = ((SelectionKey) (it.next()));
                    Acceptor acceptor = (Acceptor)sk.attachment();
                    acceptor.accept();
                }
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }

    class Acceptor {
        public void accept() {
            try {
                // 每个SocketChannel对象都代表一个和客户端的TCP连接,
                // 读取客户端发过来的数据和发送数据给客户端都是通过SocketChannel对象进行
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    // SocketChannel对象的数据读写和处理逻辑是由Handler实现的,
                    // Handler对象的方法是在选中的某一个EventLoop的线程中执行的(为什么?答案在Handler实现中)
                    new Handler(nextEventLoop().selector, c);
            } catch(IOException ex) { /* ... */ }
        }

        EventLoop nextEventLoop() {
            // 循环选择EventLoop(Round Robin方式),使得每个EventLoop负载均衡
            return eventLoopPool[(currentIndex++) % EVENT_LOOP_POOL_SIZE];
        }
    }
}



final class EventLoop implements Runnable {
    final Selector selector;

    EventLoop() throws IOException {
        // 每个EventLoop都有属于自己的Selector对象,
        // 并且运行在独立的线程上
        selector = Selector.open();
        new Thread(this).start();
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                    dispatch((SelectionKey) (it.next()));
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        if (r != null)
            r.run();
    }
}



final class Handler implements Runnable {
    static final int MAXIN = 5000;
    static final int MAXOUT = 10000;
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    Handler(Selector sel, SocketChannel c)
            throws IOException {
        socket = c;
        c.configureBlocking(false);
        // 为什么此Handler会在EventLoop中执行?
        // 因为入参Selector对象来自于EventLoop,
        // EventLoop的线程负责调用此Selector对象的select方法等待事件的发生,
        // 事件发生后调用SelectionKey.attachment上的run方法,
        // 由于attachment就是此Handler对象,所以相当于在EventLoop线程中调用此Handler的run方法。
        sk = socket.register(sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }
    boolean inputIsComplete() /* ... */ }
    boolean outputIsComplete() /* ... */ }
    void process() /* ... */ }

    public void run() {
        try {
            if (state == READING) read();
            else if (state == SENDING) send();
        } catch (IOException ex) { /* ... */ }
    }

    void read() throws IOException {
        socket.read(input);
        // inputIsComplete方法是干什么用的?
        // 客户端发送过来的数据并不一定是全部放到buffer后再通知selector说数据就绪了,
        // 大部分情况是部分数据放到缓存以后就通知selector说数据就绪了。所以此read方法可能会被调用多次。
        // 用户需要在客户端和服务端约定好怎么样才算数据发送完了,
        // 比如可以用一个特殊的字符结尾,或者先用发送过来的头两个字节代表数据的长度等。
        if (inputIsComplete()) {
            process();
            state = SENDING;
            // Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) sk.cancel();
    }
}





今天的分享就到这里了,我们下一篇见。


程序员进阶路上的高并发Reactor线程模型


程序员进阶路上的高并发Reactor线程模型

快来关注我吧



--- 喜欢,就“在看”一下呗 ---

以上是关于程序员进阶路上的高并发Reactor线程模型的主要内容,如果未能解决你的问题,请参考以下文章

Reactor 模型基本并发编程模型

Java NIO的三种Reactor线程模型分析

Reactor线程模型

netty - 线程模型 reactor

netty - 线程模型 reactor

图解Reactor模型