网络IO模型Reactor 模式
Posted 绝世好阿狸
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了网络IO模型Reactor 模式相关的知识,希望对你有一定的参考价值。
/**
* Reactor模式简述
*
* Reactor负责轮询selector,将就绪事件分发给handler处理。
* handler大致有两种:
* 1.acceptor:负责建连,建连后注册io handler;
* 2.io handler:负责处理io读写事件;
*
* 所以Reactor模式是一种事件响应式模式。提前注册handler。selectionKey的作用是将handler与channel关联(注册事件时不能指定handler)
* 当select返回就绪事件时,可以通过selectionKey取得注册的handler。
*
* 需要注意的是:Reactor只是在编程模式上是异步的,但是底层io模型仍然是同步的。
*
* Reactor模式又可以细分为3类:
* 1.单reactor单线程模式:这种模式分发和io均由Reactor线程处理。
* 2.单reactor多线程模式:与1类似,只是io在独立的线程池里处理。reactor只负责分发。
* 3.多reactor多线程模式:在2基础上,使用了多个reactor,进行了分工。
* boss reactor只负责分发建连事件;若干worker reactor负责分发io事件。
*/
1.单reactor单线程
public class SimpleReactor implements Runnable
private Selector selector;
public SimpleReactor(int port)
try
selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
key.attach(new Acceptor(selector, serverSocketChannel));
catch (IOException e)
e.printStackTrace();
@Override
public void run()
while (true)
try
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext())
dispatch(iterator.next());
iterator.remove();
catch (IOException e)
e.printStackTrace();
private void dispatch(SelectionKey key)
Runnable runnable = (Runnable) key.attachment();
runnable.run();
public class Acceptor implements Runnable
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel)
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
@Override
public void run()
try
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
System.out.println("new client connected");
// demo仅注册read
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
// single thread
key.attach(new ReadWorker(socketChannel));
// worker poll
// key.attach(new ThreadPoolReadProcessor(socketChannel));
catch (IOException e)
e.printStackTrace();
public class ReadWorker implements Runnable
private SocketChannel socketChannel;
public ReadWorker(SocketChannel socketChannel)
this.socketChannel = socketChannel;
@Override
public void run()
ByteBuffer byteBuffer = ByteBuffer.allocate(4098);
try
// 多线程模式下,不检测会导致读一个关闭的连接
if (!socketChannel.isOpen())
return;
int size = socketChannel.read(byteBuffer);
if (size == -1)
return;
String msg = new String(byteBuffer.array()).trim();
System.out.printf("receive data: %s, size: %d \\n", msg, size);
socketChannel.close();
catch (IOException e)
e.printStackTrace();
public class BootStrap
public static void main(String[] args)
simpleReactor();
// multiReactor();
public static void simpleReactor()
SimpleReactor reactor = new SimpleReactor(15001);
reactor.run();
public static void multiReactor()
BossReactor bossReactor = new BossReactor(15001);
bossReactor.run();
2.单reactor多线程
public class ThreadPoolReadProcessor implements Runnable
// singleton
private static ExecutorService poolExecutor = Executors.newFixedThreadPool(10);
private Runnable readWorker;
public ThreadPoolReadProcessor(SocketChannel socketChannel)
this.readWorker = new ReadWorker(socketChannel);
@Override
public void run()
poolExecutor.submit(readWorker);
新增ThreadPoolReadProcessor,并在Acceptor中替代ReadWorker。
3.多reactor多线程
public class BossReactor implements Runnable
private Selector selector;
public BossReactor(int port)
try
selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
key.attach(new MultiThreadAcceptor(serverSocketChannel));
catch (IOException e)
e.printStackTrace();
@Override
public void run()
while (true)
try
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext())
dispatch(iterator.next());
iterator.remove();
catch (IOException e)
e.printStackTrace();
private void dispatch(SelectionKey key)
Runnable runnable = (Runnable) key.attachment();
runnable.run();
public class WorkerReactor implements Runnable
private Selector selector;
public WorkerReactor()
try
this.selector = Selector.open();
catch (IOException e)
e.printStackTrace();
public SelectionKey register(SocketChannel socketChannel, int op)
try
return socketChannel.register(selector, op);
catch (ClosedChannelException e)
throw new RuntimeException();
@Override
public void run()
while (true)
try
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext())
dispatch(iterator.next());
iterator.remove();
catch (IOException e)
e.printStackTrace();
private void dispatch(SelectionKey key)
Runnable runnable = (Runnable) key.attachment();
runnable.run();
public class MultiThreadAcceptor implements Runnable
private static final int DEFAULT_SIZE = 10;
private ServerSocketChannel serverSocketChannel;
private WorkerReactor[] workerReactors;
public MultiThreadAcceptor(ServerSocketChannel serverSocketChannel)
this.serverSocketChannel = serverSocketChannel;
this.workerReactors = new WorkerReactor[DEFAULT_SIZE];
for (int i = 0; i < DEFAULT_SIZE; i++)
workerReactors[i] = new WorkerReactor();
// 这里简化处理,最好用线程池
new Thread(workerReactors[i]).start();
@Override
public void run()
try
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
System.out.println("new client connected");
int index = new Random().nextInt(DEFAULT_SIZE);
SelectionKey selectionKey = workerReactors[index].register(socketChannel, SelectionKey.OP_READ);
selectionKey.attach(new ThreadPoolReadProcessor(socketChannel));
catch (IOException e)
e.printStackTrace();
参考:
https://juejin.cn/post/6844904102560792590
https://zhuanlan.zhihu.com/p/146082678
以上是关于网络IO模型Reactor 模式的主要内容,如果未能解决你的问题,请参考以下文章