JAVA Reactor模型

Posted xc技术天堂

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JAVA Reactor模型相关的知识,希望对你有一定的参考价值。

一.Reactor是一种设计模式。基于事件驱动,然后通过事件分发器,将事件分发给对应的处理器进行处理。 

Reactor:监听网络端口,分发网络连接事件给Acceptor,具体的感兴趣读写事件handler Acceptor:接受新的连接,连接的读写事件操作交给相应的Handler Handler:注册为callback对象,并且注册自己感兴趣的读事件或者写事件等等,然后再相应的方法内进行业务操作内容

1.单线程版


参考代码:

package com.ddcx.utils;
/** * @author: xc * @ClassName: Test * @Date: 2021-03-03 12:47 * @Description: */
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;
class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException { //Reactor初始化 selector = Selector.open(); serverSocket = ServerSocketChannel.open(); //要监听的网络端口号 serverSocket.socket().bind(new InetSocketAddress(port)); //非阻塞 serverSocket.configureBlocking(false); //分步处理,第一步,接收accept事件 SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); //attach callback object, Acceptor sk.attach(new Acceptor()); }
@Override public void run() { try { while (!Thread.interrupted()) { //阻塞到至少有一个通道在你注册的事件上就绪了。 selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { //Reactor负责dispatch收到的事件 dispatch((SelectionKey) (it.next())); } selected.clear(); } } catch (IOException ex) { /* ... */ } }
void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); //调用之前注册的callback对象 if (r != null) { //这里是Acceptor的run方法 r.run(); } }
// inner class class Acceptor implements Runnable {
@Override public void run() { try { //阻塞到获取网络连接通道 SocketChannel channel = serverSocket.accept(); if (channel != null) { //连接已经就绪,将相应的感兴趣的读写事件注册到回调中 new ReadHander(selector, channel); } } catch (IOException ex) { /* ... */ } } }

public static void main(String[] args) throws IOException { Reactor reactor = new Reactor(9000); reactor.run(); }}

package com.ddcx.utils;
/** * @author: xc * @ClassName: ReadHander * @Date: 2021-03-03 12:48 * @Description: */
import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;
class ReadHander implements Runnable { final SocketChannel channel; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(90); ByteBuffer output = ByteBuffer.allocate(400); static final int READING = 0, SENDING = 1; int state = READING;
ReadHander(Selector selector, SocketChannel c) throws IOException { channel = c; c.configureBlocking(false); // Optionally try first read now sk = channel.register(selector, 0);
//将Handler作为callback对象 sk.attach(this);
//第二步,注册Read就绪事件 sk.interestOps(SelectionKey.OP_READ); selector.wakeup(); }
boolean inputIsComplete() { /* ... */ return false; }
boolean outputIsComplete() {
/* ... */ return false; }
void process() { /* ... */ return; }
@Override public void run() { try { if (state == READING) { read(); } else if (state == SENDING) { send(); } } catch (IOException ex) { /* ... */ } }
void read() throws IOException { channel.read(input); if (inputIsComplete()) { process(); state = SENDING; // Normally also do first write now //第三步,接收write就绪事件 sk.interestOps(SelectionKey.OP_WRITE); } }
void send() throws IOException { channel.write(output);
//write完就结束了, 关闭select key if (outputIsComplete()) { sk.cancel(); } }}

①通过Selector的select()方法可以选择已经准备就绪的通道

②通过ServerSocketChannel.accept()方法监听新进来的连接.当accept()方法返回的时候,它返回一个包含新进来的连接的SocketChannel.因此,accept()方法会一直阻塞到有新连接到达.通常不会仅仅只监听一个连接单线程版Reactor模型,其实就是做了一件事情,就是把要监听的socket端口注册到selector中去,并且轮询线程内可以获取到多个已经准备就绪的socket连接通道,同时进行处理这些事件

2. 多线程Reactor模型


多线程主要体现在handler处理的时候,因为处理的事件可能耗时相对于久一些,这样做可以更快的处理感兴趣的事件

selectionKey.attach(new HandlerThreadPool(socketChannel));


3.主从模式多线程

1.mainReactor负责监听socket连接,用来处理新连接的建立和就绪,将建立的socketChannel指定注册给subReactor。网络连接的建立一般很快,所以这里一个主线程就够了

2.subReactor一般是cpu的核心数,将连接加入到连接队列进行监听,并创建handler进行各种事件处理;当有新事件发生时,subreactor就会调用对应的handler处理,而对具体的读写事件业务处理的功能交给handler线程池来完成。

以上是关于JAVA Reactor模型的主要内容,如果未能解决你的问题,请参考以下文章

JAVA Reactor模型

Java IO篇:什么是 Reactor 网络模型?

Java面试题之谈谈reactor模型

Netty的Reactor多线程模型,NioEventLoop,ChannelPipeline简介

1. Netty准备知识:Java NIO

BIO/NIO 线程模型以及高性能通讯框架 Netty Reactor 模型初探