用java构建构建可伸缩的高性能IO服务
Posted 犀牛饲养员
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用java构建构建可伸缩的高性能IO服务相关的知识,希望对你有一定的参考价值。
写在前面
本篇文章是java大神人物 Doug Lea 写的一篇关于java io模型的ppt。如果你没听说过这个名字,那 java.util.concurrent
并发包你也一定用过。
ppt一共40页,主要介绍了如何在java中构建可伸缩的高性能IO服务,并且给出了Java网络编程中Reactor模式的几种实现,是一个非常好的学习资料。根据我查的资料,这篇文章写于1997年,后来的netty也是借鉴了里面不少思想。
我把英文原文翻译过来整理成这篇文章。目的一个是自己学习,一个是方便不习惯看英文资料的同学进行学习。
因为原文是ppt,直接翻译有点怪怪的,我加入一些自己的理解,尽量显得连贯一些。
英文原文地址:
http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
正文开始
可伸缩的网络服务
在一般的网络服务或分布式服务中,大多具有如下的基础结构:
- 接收请求
- 解码请求
- 处理请求
- 对响应数据编码
- 发送数据
在实际的使用过程中,每一步的开销都是不同的,如:xml解析,文件传输,web页面加载,计算服务等。
传统的服务设计如下图所示:
如图所示,每个处理程序都会被分配一个线程。
传统的ServerSocket循环示例代码如下:
class Server implements Runnable
public void run()
try
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
catch (IOException ex)
/* ... */
static class Handler implements Runnable
final Socket socket;
Handler(Socket s)
socket = s;
public void run()
try
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
catch (IOException ex)
/* ... */
private byte[] process(byte[] cmd)
/* ... */
我们设计扩张性的目标是什么?一般包括:
- 负载增加时优雅的降级
- 随着硬件资源的增加,能够不断提高性能
- 满足可用性和性能指标
- 低延迟
- 满足高峰需求
- 可调节服务质量
- 分治处理通常是实现扩展性的最好方法.
那么什么是分治处理呢?包括几个点:
- 将处理分解成更小的任务,每个任务以非阻塞的的方式执行相同的操作。
- 当任务处于启用状态时,才执行(通过io事件触发)
- java nio 对这种机制有支持的,包括:
- 非阻塞的读写
- 将的io事件分发给相关联的任务执行
- 结合事件驱动设计,可以有更多的变化.
事件驱动处理
事件驱动设计与其他方案相比更有效率,表现在几个方面:
- 更少的资源,通常不需要为每个client创建线程
- 更少的开销,减少的线程会使降低上下文的切换,使用更少的锁
- 调度会慢,必须手动将action绑定到事件上.
当然事件驱动设计也导致编码实现更加复杂困难,比如:
-
分解为小的,简单的非阻塞操作
- 类似gui的事件驱动
- 也不能完全消除所有的阻塞:gc, page faults(内存缺页中断)等。
-
跟踪服务的相关逻辑状态
上图为gui中的事件驱动模型设计,事件驱动的基本思路很相似。
Reactor 模式
- reactor模式通过指派相应的处理器来响应IO事件(类似AWT线程)
- 处理器使用非阻塞处理(类似AWT ActionListener)
- 将处理器绑定到事件(类似Awt中 addActionListener)
参见Schmidt etal《pattern-oriented software architecture》第二卷(POSA2),还有 Richard Steven 的《networking books》,Matt Welsh的 《SEDA framework》等
下图是一个基本的Reactor设计
我们先来看一个单线程版本实现示例,在示例中用到了java nio提供了如下机制:
- Channels以非阻塞模式连接文件,套接字并进行读写
- Bufferes是被Channels直接进行读写操作的数组对象
- Selectors判断Channels发生IO事件的选择器
- SelectionKeys负责IO事件状态与绑定
class Reactor implements Runnable
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());=
public void run()
try
while (!Thread.interrupted())
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey)(it.next());
selected.clear();
catch (IOException ex) /* ... */
void dispatch(SelectionKey k)
Runnable r = (Runnable)(k.attachment());
if (r != null)
r.run();
class Acceptor implements Runnable
public void run()
try
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
catch(IOException ex) /* ... */
final class Handler implements Runnable
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException
socket = c;
c.configureBlocking(false);
// Optionally try first read now
sk = socket.register(sel, 0);
sk.attach(this); //将Handler绑定到SelectionKey上
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
boolean inputIsComplete() /* ... */
boolean outputIsComplete() /* ... */
void process() /* ... */
// 请求处理
public void run()
try
if (state == READING) read();
else if (state == SENDING) send();
catch (IOException ex) /* ... */
void read() throws IOException
socket.read(input);
if (inputIsComplete())
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
void send() throws IOException
socket.write(output);
if (outputIsComplete()) sk.cancel();
使用状态模式(GoF)进行优化,不需要再进行状态的判断。
class Handler // ...
public void run() // initial state is reader
socket.read(input);
if (inputIsComplete())
process();
sk.attach(new Sender());
sk.interest(SelectionKey.OP_WRITE);
sk.selector().wakeup();
class Sender implements Runnable
public void run() // ...
socket.write(output);
if (outputIsComplete()) sk.cancel();
如果用多线程的方式实现,有以下几个关键点:
- 增加worker线程
reactor需要迅速触发处理流程,process()方法会使reactor变慢,将非io操作(process()方法)放到worker thread中
- 增加reactor线程
reactor多线程能够饱和式处理io,将负载分发到其他reactor也可以进行负载均衡,能根据cpu和io使用率进行调整
handler的线程池版实现示例代码如下:
class Handler implements Runnable
// uses util.concurrent thread pool
static PooledExecutor pool = new PooledExecutor(...);
static final int PROCESSING = 3;
// ...
synchronized void read() // ...
socket.read(input);
if (inputIsComplete())
state = PROCESSING;
pool.execute(new Processer());
synchronized void processAndHandOff()
process();
state = SENDING; // or rebind attachment
sk.interest(SelectionKey.OP_WRITE);
class Processer implements Runnable
public void run() processAndHandOff();
关于如何协调任务
- 任务之间的交互,每个任务的启动,执行,传递通常很快,难以控制
- 每个handler中分发器的回调设置状态,返回值等(中介者模式的变体)
- 不同线程的缓冲区问题(基于队列)
- 需要返回值时,线程需要通过join,wait/notify等方法进行同步
将work thread进行池化,使用execute(Runnable r)作为主要方法进行处理,可以通过线程池的参数进行控制,进行调优.如:
- 任务队列的选择
- 最大/最小线程数
- 空闲线程的存活时间
- 拒绝策略
reactor线程的池化处理
- 可以根据io/cpu使用率调整,达到合适的状态
- 每个reactor静态或动态构造,在自己的线程中,包含自己的selector,dispatch loop
- 在主接收器(acceptor)中分发给其他reactor.
示例代码:
Selector[] selectors; // also create threads
int next = 0;
class Acceptor // ...
public synchronized void run() ...
Socket connection = serverSocket.accept();
if (connection != null)
new Handler(selectors[next], connection);
if (++next == selectors.length) next = 0;
模型如下图所示:
可以再结合java-nio的其他特性,比如:
- 一个reactor拥有多个selector
将不同的handler绑定不同的io时间时,需要小心同步问题。
- 网络文件传输
文件到网络,网络到文件的拷贝
- 内存映射文件
通过缓冲区访问文件
- 直接缓冲区
使用零拷贝,但是会又初始化和释放的开销,适合长期存活的应用。
另外,还有一些和connection相关的扩展:
不再是代替单个服务请求,而是如下的流程:
- client建立连接
- client发送一系列请求
- 客户端断开连接
一些使用示例,如:数据库和事务的监控,多人在线游戏,聊天室等.
能基于网络服务模式进行扩展:
- 处理长期连接的客户端
- 跟踪客户端和会话状态
- 跨主机分发服务(分布式).
java非阻塞IO api
- Buffer
- ByteBuffer
(CharBuffer, LongBuffer, etc not shown.)
- Channel
- SelectableChannel
- SocketChannel
- ServerSocketChannel
- FileChannel
- Selector
- SelectionKey
以上是关于用java构建构建可伸缩的高性能IO服务的主要内容,如果未能解决你的问题,请参考以下文章