ACE_TAO 008 Reactor基本原理与说明
Posted islinyoubiao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ACE_TAO 008 Reactor基本原理与说明相关的知识,希望对你有一定的参考价值。
Reactor框架是ACE各个框架中最基础的一个框架,其他框架或多或少用到了Reactor框架。本文先分析Reactor构架模式的基本原理,然后利用ACE Reactor框架实现服务器程序,最后分析Reactor框架的实现。
Reactor框架支持的事件类型包括IO事件、信号量事件、定时器事件和Notify事件。示例应用属于IO事件,所以先从IO事件开始分析。
下面从意图、上下文、问题、解决方案、结构和实现6个方面的内容来分析。
1。意图
在事件驱动的应用中,将一个或多个客户的服务请求分离(demultiplex)和调试(dispatch)给应用程序 。
2。上下文
在事件驱动的应用中,同步地、有序地处理同时接收的多个服务请求。
3。问题
在分布式系统尤其是服务器这一类事件驱动应用中,虽然这些请求最终会被序列化地处理,但是必须时刻准备着处理多个同时到来的服务请求。在实际应用中,这些请求总是通过一个事件(如CONNECTOR、READ、WRITE等)来表示的。在有序地处理这些服务请求之前,应用程序必须先分离和调度这些同时到达的事件。为了有效地解决这个问题,我们需要做到以下4方面:
a.为了提高系统的可测量性和反应时间,应用程序不能长时间阻塞在某个事件源上而停止对其他事件的处理,这样会严重降低对客户端的响应速度;
b.为了提高吞吐量,任何没有必要的上下文切换、同步和CPU之间的数据移动都要避免;
c.引进新的服务或改良已有的服务都要对既有的事件分离和调度机制带来尽可能小的影响;
d.大量的应用程序代码需要隐藏在复杂的多线程和同步机制之后。
4。解决方案
在一个或多个事件源上等待事件的到来,例如,一个已经连接的Socket描述符就是一个事件源。将事件的分离和调度整合到处理它的服务中,而将分离和调度机制从应用程序对特定事件的处理中分离开,也就是说分离和调度机制与特定的应用程序无关。
具体来说,每个应用程序提供的每个服务都有一个独立的事件处理器与之对应。由事件处理器处理来自事件源的特定类型的事件。每个事件处理器都要事先注册到Reactor管理器中。Reactor管理器使用同步事件分离器在一个或多个事件源中等待事件的发生。当事件发生后,同步事件分离器通知Reactor管理器,最后由Reactor管理器调度和该事件相关的事件处理器来完成请求的服务。
5。结构
在Reactor模式中,有5个关键的参与者。
a.描述符(handle):由操作系统提供,用于识别每一个事件,如Socket描述符、文件描述符等。在Linux中,它用一个整数来表示。事件可以来自外部,如来自客户端连接请求、数据等。事件也可以来自内部,如定时器事件。
b.同步事件分离器(demultiplexer):是一个函数,用来等待一个或多个事件的发生。调用者会被阻塞,直到分离器分离的描述符集上有事件发生。Linux的select函数是一个经常被使用的分离器。
c.事件处理器接口(event handler):是由一个或多个模板函数组成的接口。这些模板函数描述了和应用程序相关的对某个事件的操作。
d.具体的事件处理器:是事件处理器接口的实现。它实现了应用程序提供的某个服务。每个具体的事件处理器总和一个描述符相关。它使用描述符来识别事件、识别应用程序提供的服务。
e.Reactor管理器(reactor):定义了一些接口,用于应用程序控制事件调度,以及应用程序注册、删除事件处理器和相关的描述符。它是事件处理器的调度核心。Reactor管理器先是分离每个事件,然后调度事件处理器,最后调用相关的模板函数来处理这个事件。
通过上述分析,是Reator管理器而不是应用程序负责等待事件、分离事件和调度事件。实际上,Reator管理器并没有被具体的事件处理器调用,而是管理器调度具体的事件处理器,由事件处理器对发生的事件做出处理,然后把它注册到Reactor管理器中。接下来的工作由管理器来完成。
要设计和实现一个简单Reactor框架以运行IO事件,需要实现两个组件:事件处理器接口和Reator管理器。至于其他组件,如同步事件分离器可以使用操作系统提供的select、poll或其他类似的函数;而描述符可以使用文件描述符或其他可以识别事件的数据结构,一般操作系统都会提供。事件处理器接口包含一系列模板函数,可以根据实际处理的数据进行设计;Reactor管理器肩负事件的分离和调度,是整个框架设计的核心。
ACE的Reactor框架在Linux平台下使用文件描述符作为IO事件的描述符,使用ACE_Event_Handler类作为各类事件的处理器接口。将同步事件分离函数放到Reactor管理器中,这样使用不同的同步事件分离函数就需要实现不同的Reactor管理器。ACE使用Bridge设计模式解决了这一问题,将与同步事件分离函数相关的操作放到Bridge设计模式的Implementor中。凡是ACE支持的同步事件分离函数都会有一个具体的Implementor与之对应。
ACE的Reactor管理器还提供了用于实现Singleton设计模式的操作,使用这些操作时,一个进程只能有一个全局的Reactor管理器。有调用Singleton设计模式接口时,Reactor管理器会在启动时根据操作系统的配置选择一个具体的Implementor。当然,如果你不喜欢这个默认的Implementor,可以通过函数进行更换。为了提高整个系统对事件分离和调度的性能,ACE还允许应用程序创建多个Reactor管理器实例。在这种情况下,应用程序将不能调用用于Singleton设计模式的操作,只能直接使用Reactor管理器实例对象的方法实现对事件的分离和调度。同时提供这两种使用方法,可以更大程序地满足应用程序的苛刻要求。
ACE实现的Reactor框架结构要比Reactor构架模式中分析的结构复杂得多。这是因为ACE的Reactor框架除了处理IO事件之外,还要处理定时器、信号量等常见事件,并且所有这些处理都必须满足跨平台的要求。要将对这些事件的处理抽象出来,并且提供给应用程序一个统一的接口,ACE的Reactor管理器的实现还采用了Facade设计模式。实际上,Reactor框架管理的IO事件、信号量事件、定时器事件和Notify事件在实现上都有一个小的组件与之对应,这样可以将Reactor管理器与具体的事件处理解耦。使用Facade设计模式,将这些小的组件的接口封装起来,使得应用程序 无法感知它们的存在,可以减少应用程序处理对象的数目,并且使得这些小的组件使用起来更加方便。
以上分析的3种设计模式以及Factory设计模式,在ACE的框架管理器的实现中被频繁使用。这些设计模式以及它们的使用,既为我们学习设计模式提供了非常好的场景,又为我们实现软件框架管理器提供了实用的方法。ACE的Reactor框架与框架五元素的对应关系非常密切,是一个典型的事件驱动型框架,它为我们打开了ACE的框架之门,是学习其他框架的基础。
Reactor框架应用示例
用Reactor框架实现一个简单的服务器程序。这个服务器程序等待客户的连接请求,一旦请求到来,Socket连接建立后,服务器程序简单地打印客户端的地址信息和接收的数据,最后将新建的Socket关闭。
代码如下:
handle_data.h
#ifndef HANDLE_DATA_H
#define HANDLE_DATA_H
#include <ace/OS_NS_time.h>
#include <ace/Time_Value.h>
#include <ace/QtReactor/QtReactor.h>
#include <ace/Event_Handler.h>
#include <ace/Argv_Type_Converter.h>
#include <ace/Acceptor.h>
#include <ace/Connector.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/SOCK_Connector.h>
#include <ace/SOCK_Dgram.h>
class Handle_data : public ACE_Event_Handler
public:
//Handle_data的构造函数。事件处理器对象有一个指针指向Reactor管理器,默认
//使用Reactor管理器的Instance函数返回的Singleton对象
Handle_data(ACE_Reactor *r = ACE_Reactor::instance()):ACE_Event_Handler(r)
//open函数是Handle_data类的初始化函数
ACE_INT32 open();
//handle_xxx函数是事件处理器接口的模板函数,具体的事件处理器可以根据对事件的处理有选择地实现
ACE_INT32 handle_input(ACE_HANDLE = ACE_INVALID_HANDLE);
ACE_INT32 handle_close(ACE_HANDLE = ACE_INVALID_HANDLE, ACE_Reactor_Mask mask=0);
//每一个具体的IO事件处理器都和一个描述符对应,get_handle函数被框架调用,用于返回事件处理器的描述符,被
//描述符用于框架分离和调度事件
ACE_HANDLE get_handle() const
return peer_.get_handle();
ACE_SOCK_Stream &get_peer()
return peer_;
private:
~Handle_data()ACE_DEBUG((LM_DEBUG, "Handle data ! dctor.\\n"));;
private:
//ACE_SOCK_Stream数据成员表示一个流,用于保存Socket描述符和对Socket进行IO操作
ACE_SOCK_Stream peer_;
;
#endif // HANDLE_DATA_H
handle_data.cpp
#include "handle_data.h"
ACE_INT32 Handle_data::open()
ACE_INT32 ret = 0;
ACE_INET_Addr remote_addr;
//获取客户端的地址,保存在remote_addr中
get_peer().get_remote_addr(remote_addr);
ACE_DEBUG((LM_DEBUG, "The Remote addr is %s\\n", remote_addr.get_host_addr()));
//调用register_handler函数将事件处理器注册到Reactor框架中,注册的事件类型为
//ACE_Event_Handler::READ_MASK,表示只处理读事件
ret = reactor()->register_handler(this, ACE_Event_Handler::READ_MASK);
if(ret != -1)
ACE_DEBUG((LM_DEBUG, "Handle data register ok\\n"));
return ret;
ACE_INT32 Handle_data::handle_input(ACE_HANDLE)
ACE_INT8 buf[512] = 0;
ACE_INT32 len;
len = get_peer().recv(buf, 500);
//如果接收到数据长度大于0,那么将数据输出,然后返回0
if(len > 0)
ACE_DEBUG((LM_DEBUG, "Rev data %s, len:%d.\\n", buf, len));
return 0;
//如果接收的数据长度等于0,那么表示对端关闭了Socket,返回-1
else if(len == 0)
ACE_DEBUG((LM_DEBUG, "Rev data eln is 0, client exit.\\n"));
return -1;
//如果调用发生错误,则返回-1
else
ACE_DEBUG((LM_DEBUG, "Rev data error len<0\\n"));
return -1;
ACE_INT32 Handle_data::handle_close(ACE_HANDLE, ACE_Reactor_Mask mask)
get_peer().close(); //关闭Socket
ACE_DEBUG((LM_DEBUG, "Handle data close.\\n"));
delete this;//释放事件处理器对象
return 0;
Acceptor.h
#ifndef ACCEPTOR_H
#define ACCEPTOR_H
#include "handle_data.h"
class Acceptor : public ACE_Event_Handler
public:
Acceptor(ACE_Reactor *r = ACE_Reactor::instance()):ACE_Event_Handler(r)
ACE_INT32 open(const ACE_UINT16 port);
ACE_INT32 handle_input(ACE_HANDLE = ACE_INVALID_HANDLE);
ACE_INT32 handle_close(ACE_HANDLE = ACE_INVALID_HANDLE, ACE_Reactor_Mask = 0);
ACE_HANDLE get_handle() const
return acceptor_.get_handle();
ACE_SOCK_Acceptor &get_acceptor()return acceptor_;
private:
~Acceptor();
private:
ACE_SOCK_Acceptor acceptor_;
;
#endif // ACCEPTOR_H
Acceptor.cpp
#include "acceptor.h"
ACE_INT32 Acceptor::open(const ACE_UINT16 port)
ACE_INET_Addr addr;
//初始化addr,侦听的端口由port设定,IP地址任意
addr.set(port, (ACE_UINT32)INADDR_ANY);
//调用ACE_SOCK_Acceptor对象的open函数,建立侦听Socket
//如果操作失败,返回-1。
if(acceptor_.open(addr) == -1)
ACE_DEBUG((LM_DEBUG, "accept open error.\\n"));
return -1;
//调用Reactor管理器的register_handler函数,将自身注册到Reactor框架中
return reactor()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);
ACE_INT32 Acceptor::handle_input(ACE_HANDLE)
//调用ACE_NET_RETURN宠创建Handle_data对象。
//一个新的连接已经建立,需要创建一个事件处理器来处理该连接上的IO操作
Handle_data *handle_data = 0;
ACE_NEW_RETURN(handle_data, Handle_data(reactor()), -1);
//ACE_NEW_RETURN是ACE使用的内存申请宏。通过ACE_SOCK_Acceptor的accept调用
//接收客户端连接,accept函数会将新的Socket描述符保存在handle_data对象的peer_数据成员中。
//如果调用rwm失败,那么释放handle_data,然后返回-1
if(acceptor_.accept(handle_data->get_peer()) == -1)
ACE_DEBUG((LM_DEBUG, "Accept handle input accept error\\n"));
return -1;
//调用open函数初始化handle_data。如果初始化失败,那么调用handle_close函数执行关闭操作,返回-1
else if(handle_data->open() == -1)
ACE_DEBUG((LM_DEBUG, "Accept handle input open error\\n"));
handle_data->handle_close();
return -1;
else
ACE_DEBUG((LM_DEBUG, "Accept handle input ok\\n"));
return 0;
ACE_INT32 Acceptor::handle_close(ACE_HANDLE, ACE_Reactor_Mask)
acceptor_.close();//关闭侦听的Socket
delete this;//释放事件处理器对象
ACE_DEBUG((LM_DEBUG, "Accept handle close ok\\n"));
return 0;
QACEApplication.h
#ifndef QACEAPPLICATION_H
#define QACEAPPLICATION_H
#include /**/ <QtWidgets/QApplication>
class QACEApplication : public QApplication
Q_OBJECT
public:
typedef QApplication inherited;
public:
QACEApplication( int argc, char *argv[] );
virtual int exec( int msec = 0 );
public slots:
virtual void finishTest(); //!< slot to finish the test, connected to finishTimer_
;
#endif // QACEAPPLICATION_H
QACEApplication.cpp
#include "qaceapplication.h"
#include <ace/OS_NS_time.h>
#include <ace/Time_Value.h>
#include <ace/QtReactor/QtReactor.h>
#include <ace/Event_Handler.h>
#include <ace/Argv_Type_Converter.h>
#include <ace/Acceptor.h>
#include <ace/Connector.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/SOCK_Connector.h>
#include <ace/SOCK_Dgram.h>
#include "handle_data.h"
#include "acceptor.h"
QACEApplication::QACEApplication (int argc, char *argv[]):
QApplication (argc, argv, false) /* do not enable GUI */
//FUZZ: disable check_for_lack_ACE_OS
//connect (&finishTimer_, SIGNAL (timeout ()), this, SLOT (finishTest ()));
//FUZZ: enable check_for_lack_ACE_OS
void QACEApplication::finishTest ()
ACE_OS::exit ();
int QACEApplication::exec (int msec)
ACE_UINT16 port = 60000;
Acceptor *accept;
ACE_NEW_RETURN(accept, Acceptor(ACE_Reactor::instance()), -1);
//调用open函数初始化accept
if(accept->open(port) == -1)
accept->handle_close();
ACE_DEBUG((LM_DEBUG, "main open error\\n"));
return -1;
//运行Reactor管理器的循环调度函数
if(ACE_Reactor::run_event_loop() == -1)
accept->handle_close();
ACE_DEBUG((LM_DEBUG, "Main run event loop error\\n"));
return -1;
accept->handle_close();
return inherited::exec ();
main.cpp
#include "ace/OS_main.h"
#include "qaceapplication.h"
int
ACE_TMAIN (int argc, ACE_TCHAR *argv[])
QACEApplication app(argc, argv);
return app.exec();
运行结果如下:
多谢,亲爱的美美。
以上是关于ACE_TAO 008 Reactor基本原理与说明的主要内容,如果未能解决你的问题,请参考以下文章