Reactor模式

Posted 2021dragon

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Reactor模式相关的知识,希望对你有一定的参考价值。


文章目录

Reactor模式

Reactor模式的定义

Reactor反应器模式,也叫做分发者模式或通知者模式,是一种将就绪事件派发给对应服务处理程序的事件设计模式。

Reactor模式_c++

Reactor模式的角色构成

Reactor主要由以下五个角色构成:

角色

解释

Handle(句柄)

用于标识不同的事件,本质就是一个文件描述符。

Sychronous Event Demultiplexer(同步事件分离器)

本质就是一个系统调用,用于等待事件的发生。对于Linux来说,同步事件分离器指的就是I/O多路复用,比如select、poll、epoll等。

Event Handler(事件处理器)

由多个回调方法构成,这些回调方法构成了与应用相关的对于某个事件的处理反馈。

Concrete Event Handler(具体事件处理器)

事件处理器中各个回调方法的具体实现。

Initiation Dispatcher(初始分发器)

初始分发器实际上就是Reactor角色,初始分发器会通过同步事件分离器来等待事件的发生,当对应事件就绪时就调用事件处理器,最后调用对应的回调方法来处理这个事件。

Reactor模式的工作流程

Reactor模式的工作流程如下:

  1. 当应用向初始分发器注册具体事件处理器时,应用会标识出该事件处理器希望初始分发器在某个事件发生时向其通知,该事件与Handle关联。
  2. 初始分发器会要求每个事件处理器向其传递内部的Handle,该Handle向操作系统标识了事件处理器。
  3. 当所有的事件处理器注册完毕后,应用会启动初始分发器的事件循环,这时初始分发器会将每个事件处理器的Handle合并起来,并使用同步事件分离器等待这些事件的发生。
  4. 当某个事件处理器的Handle变为Ready状态时,同步事件分离器会通知初始分发器。
  5. 初始分发器会将Ready状态的Handle作为key,来寻找其对应的事件处理器。
  6. 初始分发器会调用其对应事件处理器当中对应的回调方法来响应该事件。

epoll ET服务器(Reactor模式)

如果在此之前没有了解过Reactor模式,相信在看了Reactor模式的工作流程后一定是一头雾水,下面我们实现一个Reactor模式下的epoll ET服务器,来感受一下Reactor模式。

设计思路

epoll ET服务器

在epoll ET服务器中,我们需要处理如下几种事件:

  • 读事件:如果是监听套接字的读事件就绪则调用accept函数获取底层的连接,如果是其他套接字的读事件就绪则调用recv函数读取客户端发来的数据。
  • 写事件:写事件就绪则将待发送的数据写入到发送缓冲区当中。
  • 异常事件:当某个套接字的异常事件就绪时我们不做过多处理,直接关闭该套接字。

当epoll ET服务器监测到某一事件就绪后,就会将该事件交给对应的服务处理程序进行处理。

Reactor模式的五个角色

在这个epoll ET服务器中,Reactor模式中的五个角色对应如下:

  • 句柄:文件描述符。
  • 同步事件分离器:I/O多路复用epoll。
  • 事件处理器:包括读回调、写回调和异常回调。
  • 具体事件处理器:读回调、写回调和异常回调的具体实现。
  • 初始分发器:Reactor类当中的Dispatcher函数。

Dispatcher函数要做的就是调用epoll_wait函数等待事件发生,当有事件发生后就将就绪的事件派发给对应的服务处理程序即可。

EventItem类

  • 在Reactor的工作流程中说到,在注册事件处理器时需要将其与Handle关联,本质上就是需要将读回调、写回调和异常回调与某个文件描述符关联起来。
  • 这样做的目的就是为了当某个文件描述符上的事件就绪时可以找到其对应的各种回调函数,进而执行对应的回调方法来处理该事件。

所以我们可以设计一个EventItem类,该类当中的成员就包括一个文件描述符,以及该文件描述符对应的各种回调函数,此外还有一些其他成员,后面实现的时候再做详细论述。

Reactor类

  • 在Reactor的工作流程中说到,当所有事件处理器注册完毕后,会使用同步事件分离器等待这些事件发生,当某个事件处理器的Handle变为Ready状态时,同步事件分离器会通知初始分发器,然后初始分发器会将Ready状态的Handle作为key来寻找其对应的事件处理器,并调用该事件处理器中对应的回调方法来响应该事件。
  • 本质就是当事件注册完毕后,会调用epoll_wait函数来等待这些事件发生,当某个事件就绪时epoll_wait函数会告知调用方,然后调用方就根据就绪的文件描述符来找到其对应的各种回调函数,并调用对应的回调函数进行事件处理。

对此我们可以设计一个Reactor类。

  • 该类当中有一个成员函数叫做Dispatcher,这个函数就是所谓的初始分发器,在该函数内部会调用epoll_wait函数等待事件的发生,当事件发生后会告知Dispatcher已经就绪的事件。
  • 当事件就绪后需要根据就绪的文件描述符来找到其对应的各种回调函数,由于我们会将每个文件描述符及其对应的各种回调都封装到一个EventItem结构当中,所以实际我们就是需要根据文件描述符找到其对应的EventItem结构。
  • 我们可以使用C++ STL当中的unordered_map,来建立各个文件描述符与其对应的EventItem结构之间的映射,这个unordered_map可以作为Reactor类的一个成员变量,当需要找某个文件描述符的EventItem结构时就可以通过该成员变量找到。
  • 当然,Reactor类当中还需要提供成员函数AddEvent和DelEvent,用于向Dispatcher当中注册和删除事件。

此外,在Reactor类当中还有一些其他成员,后面实现的时候再做详细论述。

epoll ET服务器的工作流程

这个epoll ET服务器在Reactor模式下的工作流程如下:

  • 首先epoll ET服务器需要进行套接字的创建、绑定和监听。
  • 然后定义一个Reactor对象并初始化,初始化时要做的就是创建epoll模型。
  • 紧接着需要为监听套接字创建对应的EventItem结构,并调用Reactor类中提供的AddEvent函数将监听套接字添加到epoll模型中,并建立监听套接字与其对应的EventItem结构之间的映射关系。
  • 之后就可以不断调用Reactor类中的Dispatcher函数进行事件派发。

在事件处理过程中,会不断向Dispatcher当中新增或删除事件,而每个事件就绪时都会自动调用其对应的回调函数进行处理,所以我们要做的就是不断调用Dispatcher函数进行事件派发即可。

EventItem结构

EventItem结构中除了包含文件描述符和其对应的读回调、写回调和异常回调之外,还包含一个输入缓冲区inbuffer、一个输出缓冲区outbuffer以及一个回指指针R。

  • 当某个文件描述符的读事件就绪时,我们会调用recv函数读取客户端发来的数据,但我们并不能保证我们读取到了一个完整的报文,因此需要将读取到的数据暂时存放到该文件描述符对应的inbuffer当中,当inbuffer当中可以分离出一个完整的报文后再将其分离出来进行数据处理,这里的inbuffer本质就是用来解决粘包问题的。
  • 当处理完一个报文请求后,需要将响应数据发送给客户端,但我们并不能保证底层TCP的发送缓冲区中有足够的空间供我们写入,因此需要将要发送的数据暂时存放到该文件描述符对应的outbuffer当中,当底层TCP的发送缓冲区中有空间,即写事件就绪时,再依次发送outbuffer当中的数据。
  • EventItem结构当中设置回指指针R,便于快速找到我们定义的Reactor对象,因为后续我们需要根据EventItem结构找到这个Reactor对象。比如当连接事件就绪时,需要调用Reactor类当中的AddEvent函数将其添加到Dispatcher当中。

此外,EventItem结构当中需要提供一个管理回调的成员函数,便于外部对EventItem结构当中的各种回调进行设置。

代码如下:

typedef int(*callback_t)(EventItem*);

class EventItem
public:
int _sock; //文件描述符
Reactor* _R; //回指指针

callback_t _recv_handler; //读回调
callback_t _send_handler; //写回调
callback_t _error_handler; //异常回调

std::string _inbuffer; //输入缓冲区
std::string _outbuffer; //输出缓冲区
public:
EventItem()
: _sock(-1)
, _R(nullptr)
, _recv_handler(nullptr)
, _send_handler(nullptr)
, _error_handler(nullptr)

//管理回调
void ManageCallbacks(callback_t recv_handler, callback_t send_handler, callback_t error_handler)

_recv_handler = recv_handler;
_send_handler = send_handler;
_error_handler = error_handler;

~EventItem()

;

Reactor类

在Reactor类当中有一个unordered_map成员,用于建立文件描述符和与其对应的EventItem结构之间的映射,还有一个epfd成员,该成员是epoll模型对应的文件描述符。

  • 在初始化Reactor对象的时候就可以调用epoll_create函数创建epoll模型,并将该epoll模型对应的文件描述符用epfd成员记录下来,便于后续使用。
  • Reactor对象在析构的时候,需要调用close函数将该epoll模型进行关闭。

代码如下:

#define SIZE 256

class Reactor
private:
int _epfd; //epoll模型
std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:
Reactor()
: _epfd(-1)

void InitReactor()

//创建epoll模型
_epfd = epoll_create(SIZE);
if (_epfd < 0)
std::cerr << "epoll_create error" << std::endl;
exit(5);


~Reactor()

if (_epfd >= 0)
close(_epfd);


;

Dispatcher函数(事件分派器)

Reactor类当中的Dispatcher函数就是之前所说的初始分发器,这里我们更形象的将其称之为事件分派器。

  • 事件分派器要做的就是调用epoll_wait函数等待事件发生。
  • 当某个文件描述符上的事件发生后,先通过unordered_map找到该文件描述符对应的EventItem结构,然后调用EventItem结构当中对应的回调函数对该事件进行处理即可。

代码如下:

#define MAX_NUM 64

class Reactor
private:
int _epfd; //epoll模型
std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:
//事件分派器
void Dispatcher(int timeout)

struct epoll_event revs[MAX_NUM];
int num = epoll_wait(_epfd, revs, MAX_NUM, timeout);
for (int i = 0; i < num; i++)
int sock = revs[i].data.fd; //就绪的文件描述符
if ((revs[i].events&EPOLLERR) || (revs[i].events&EPOLLHUP)) //异常事件就绪(优先处理)
if (_event_items[sock]._error_handler)
_event_items[sock]._error_handler(&_event_items[sock]); //调用异常回调

if (revs[i].events&EPOLLIN) //读事件就绪
if (_event_items[sock]._recv_handler)
_event_items[sock]._recv_handler(&_event_items[sock]); //调用读回调

if (revs[i].events&EPOLLOUT) //写事件就绪
if (_event_items[sock]._send_handler)
_event_items[sock]._send_handler(&_event_items[sock]); //调用写回调



;

说明一下:

  • 这里没有用switch或if语句对epoll_wait函数的返回值进行判断,而是借用for循环对其返回值进行了判断。
  • 如果epoll_wait的返回值为-1则说明epoll_wait函数调用失败,此时不会进入到for循环内部进行事件处理。
  • 如果epoll_wait的返回值为0则说明epoll_wait函数超时返回,此时也不会进入到for循环内部进行事件处理。
  • 如果epoll_wait的返回值大于0则说明epoll_wait函数调用成功,此时才会进入到for循环内部调用对应的回调函数对事件进行处理。
  • 事件处理时最好先对异常事件进行处理,因此代码中将异常事件的判断放在了最前面。

AddEvent函数

Reactor类当中的AddEvent函数是用于进行事件注册的。

  • 在注册事件时需要传入一个文件描述符和一个事件集合,表示需要监视哪个文件描述符上的哪些事件。
  • 还需要传入该文件描述符对应的EventItem结构,表示当该文件描述符上的事件就绪后应该执行的回调方法。
  • 在AddEvent函数内部要做的就是,调用epoll_ctl函数将该文件描述符及其对应的事件集合注册到epoll模型当中,然后建立该文件描述符与其对应的EventItem结构的映射关系。

代码如下:

class Reactor
private:
int _epfd; //epoll模型
std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:
void AddEvent(int sock, uint32_t event, const EventItem& item)

struct epoll_event ev;
ev.data.fd = sock;
ev.events = event;

if (epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, &ev) < 0) //将该文件描述符添加到epoll模型当中
std::cerr << "epoll_ctl add error, fd: " << sock << std::endl;

else
//建立sock与EventItem结构的映射关系
_event_items.insert( sock, item );
std::cout << "添加: " << sock << " 到epoll模型中,成功" << std::endl;


;

DelEvent函数

Reactor类当中的DelEvent函数是用于进行事件删除的。

  • 在删除事件时只需要传入一个文件描述符即可。
  • 在DelEvent函数内部要做的就是,调用epoll_ctl函数将该文件描述符从epoll模型中删除,并取消该文件描述符与其对应的EventItem结构的映射关系。

代码如下:

class Reactor
private:
int _epfd; //epoll模型
std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:
void DelEvent(int sock)

if (epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr) < 0) //将该文件描述符从epoll模型中删除
std::cerr << "epoll_ctl del error, fd: " << sock << std::endl;

else
//取消sock与EventItem结构的映射关系
_event_items.erase(sock);
std::cout << "从epoll模型中删除: " << sock << ",成功" << std::endl;


;

EnableReadWrite函数

Reactor类当中的EnableReadWrite函数,用于使能或使能某个文件描述符的读写事件。

  • 调用EnableReadWrite函数时需要传入一个文件描述符,表示需要设置的是哪个文件描述符对应的事件。
  • 还需要传入两个bool值,分别表示需要使能还是使能读写事件。
  • EnableReadWrite函数内部会调用epoll_ctl函数修改将该文件描述符的监听事件。

代码如下:

class Reactor
private:
int _epfd; //epoll模型
std::unordered_map<int, EventItem> _event_items; //建立sock与EventItem结构的映射
public:
void EnableReadWrite(int sock, bool read, bool write)
struct epoll_event ev;
ev.data.fd = sock;
ev.events = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET;
if (epoll_ctl(_epfd, EPOLL_CTL_MOD, sock, &ev) < 0) //修改该文件描述符所需要监视的事件
std::cerr << "epoll_ctl mod error, fd: " << sock << std::endl;


;

回调函数

下面我们就可以实现一些回调函数,这里主要实现四个回调函数。

  • accepter:当连接事件到来时可以调用该回调函数获取底层建立好的连接。
  • recver:当读事件就绪时可以调用该回调函数读取客户端发来的数据并进行处理。
  • sender:当写事件就绪时可以调用该回调函数向客户端发送响应数据。
  • errorer:当异常事件就绪时可以调用该函数将对应的文件描述符进行关闭。

当我们为某个文件描述符创建EventItem结构时,就可以调用EventItem类提供的ManageCallbacks函数,将这些回调函数到EventItem结构当中。

  • 我们会将监听套接字对应的EventItem结构当中的recv_handler设置为accepter,因为监听套接字的读事件就绪就意味着连接事件就绪了,而监听套接字一般只关心读事件,因此监听套接字对应的send_handler和error_handler可以设置为nullptr。
  • 当Dispatcher监测到监听套接字的读事件就绪时,会调用监听套接字对应的EventItem结构当中的recv_handler回调,此时就会调用accepter回调获取底层建立好的连接。
  • 而对于与客户端建立连接的套接字,我们会将其对应的EventItem结构当中的recv_handler、send_handler和error_handler分别设置为这里的recver、sender和error。
  • 当Dispatcher监测到这些套接字的事件就绪时,就会调用其对应的EventItem结构当中对应的回调函数,也就是这里的recver、sender和error。

accepter回调

accepter回调用于处理连接事件,其工作流程如下:

  1. 调用accept函数获取底层建立好的连接。
  2. 将获取到的套接字设置为非阻塞,并为其创建EventItem结构,填充EventItem结构当中的各个字段,并注册该套接字相关的回调方法。
  3. 将该套接字及其对应需要关心的事件注册到Dispatcher当中。

下一次Dispatcher在进行事件派发时就会帮我们关注该套接字对应的事件,当事件就绪时就会执行该套接字对应的EventItem结构中对应的回调方法。

代码如下:

int accepter(EventItem* item)

while (true)
struct sockaddr_in peer;
memset(&peer, 0, sizeof(peer));
socklen_t len = sizeof(peer);
int sock = accept(item->_sock, (struct sockaddr*)&peer, &len);
if (sock < 0)
if (errno == EAGAIN || errno == EWOULDBLOCK) //并没有读取出错,只是底层没有连接了
return 0;

else if (errno == EINTR) //读取的过程被信号中断了
continue;

else //获取连接失败
std::cerr << "accept error" << std::endl;
return -1;


SetNonBlock(sock); //将该套接字设置为非阻塞
//构建EventItem结构
EventItem sock_item;
sock_item._sock = sock;
sock_item._R = item->_R;
sock_item.ManageCallbacks(recver, sender, errorer); //注册回调方法

Reactor* R = item->_R;
R->AddEvent(sock, EPOLLIN | EPOLLET, sock_item); //将该套接字及其对应的事件注册到Dispatcher中

return 0;

需要注意的是,因为这里实现的ET模式下的epoll服务器,因此在获取底层连接时需要循环调用accept函数进行读取,并且监听套接字必须设置为非阻塞。

  • 因为ET模式下只有当底层建立的连接从无到有或是从有到多时才会通知上层,如果没有一次性将底层建立好的连接全部获取,并且此后再也没有建立好的连接,那么底层没有读取完的连接就相当于丢失了,所以需要循环多次调用accept函数获取底层建立好的连接。
  • 循环调用accept函数也就意味着,当底层连接全部被获取后再调用accept函数,此时就会因为底层已经没有连接了而被阻塞住,因此需要将监听套接字设置为非阻塞,这样当底层没有连接时accept就会返回,而不会被阻塞住。

accept获取到的新的套接字也需要设置为非阻塞,就是为了避免将来循环调用recv、send等函数时被阻塞。

设置文件描述符为非阻塞

设置文件描述符为非阻塞时,需要先调用fcntl函数获取该文件描述符对应的文件状态标记,然后在该文件状态标记的基础上添加非阻塞标记​​O_NONBLOCK​​,最后调用fcntl函数对该文件描述符的状态标记进行设置即可。

代码如下:

//设置文件描述符为非阻塞
bool SetNonBlock(int sock)

int fl = fcntl(sock, F_GETFL);
if (fl < 0)
std::cerr << "fcntl error" << std::endl;
return false;

fcntl(sock, F_SETFL, fl | O_NONBLOCK);
return true;

监听套接字设置为非阻塞后,当底层连接不就绪时,accept函数会以出错的形式返回,因此当调用accept函数的返回值小于0时,需要继续判断错误码。

  • 如果错误码为​​EAGAIN​​​或​​EWOULDBLOCK​​,说明本次出错返回是因为底层已经没有可获取的连接了,此时底层连接全部获取完毕,这时我们可以返回0,表示本次accepter调用成功。
  • 如果错误码为​​EINTR​​,说明本次调用accept函数获取底层连接时被信号中断了,这时还应该继续调用accept函数进行获取。
  • 除此之外,才说明accept函数是真正调用失败了,这时我们可以返回-1,表示本次accepter调用失败。

accept、recv和send等IO系统调用为什么会被信号中断?

IO系统调用函数出错返回并且将错误码设置为​​EINTR​​,表明本次在进行数据读取或数据写入之前被信号中断了,也就是说IO系统调用在陷入内核,但并没有返回用户态的时候内核跑去处理其他信号了。

  • 在内核态返回用户态之前会检查信号的pending位图,也就是未决信号集,如果pending位图中有未处理的信号,那么内核就会对该信号进行处理。
  • 但IO系统调用函数在进行IO操作之前就被信号中断了,这实际上是一个特例,因为IO过程分为“等”和“拷贝”两个步骤,而一般“等”的过程比较漫长,而在这个过程中我们的执行流其实是处于闲置的状态的,因此在“等”的过程中如果有信号产生,内核就会立即进行信号的处理。

写事件是按需打开的

这里调用accept获取上来的套接字在添加到Dispatcher中时,只添加了​​EOPLLIN​​​和​​EPOLLET​​事件,也就是说只让epoll帮我们关心该套接字的读事件。

  • 这里之所以没有添加写事件,是因为当前我们并没有要发送的数据,因此没有必要让epoll帮我们关心写事件。
  • 一般读事件是经常会被设置的,而写事件则是按序打开的,只有当我们有数据要发送时才会将写事件打开,并且在数据全部写入完毕后又会立即将写事件关闭。

recver回调

recver回调用于处理读事件,其工作流程如下:

  1. 循环调用recv函数读取数据,并将读取到的数据添加到该套接字对应EventItem结构的inbuffer当中。
  2. 对inbuffer当中的数据进行切割,将完整的报文切割出来,剩余的留在inbuffer当中。
  3. 对切割出来的完整报文进行反序列化。
  4. 业务处理。
  5. 业务处理后形成响应报文。
  6. 将响应报头添加到对应EventItem结构的outbuffer当中,并打开写事件。

下一次Dispatcher在进行事件派发时就会帮我们关注该套接字的写事件,当写事件就绪时就会执行该套接字对应的EventItem结构中写回调方法,进而将outbuffer中的响应数据发送给客户端。

代码如下:

int recver(EventItem* item)

if (item->_sock < 0) //该文件描述符已经被关闭
return -1;
//1、数据读取
if (recver_helper(item->_sock, &(item->_inbuffer)) < 0) //读取失败
item->_error_handler(item);
return -1;


//2、报文切割
std::vector<std::string> datagrams;
StringUtil::Split(item->_inbuffer, &datagrams, "X");

for (auto s : datagrams)
//3、反序列化
struct data d;
StringUtil::Deserialize(s, &d._x, &d._y, &d._op);

//4、业务处理
int result = 0;
switch (d._op)

case +:
result = d._x + d._y;
break;
case -:
result = d._x - d._y;
break;
case *:
result = d._x * d._y;
break;
case /:
if (d._y == 0)
std::cerr << "Error: div zero!" << std::endl;
continue; //继续处理下一个报文

else
result = d._x / d._y;

break;
case %:
if (d._y == 0)
std::cerr << "Error: mod zero!" << std::endl;
continue; //继续处理下一个报文

else
result = d._x % d._y;

break;
default:
std::cerr << "operation error!" << std::endl;
continue; //继续处理下一个报文


//5、形成响应报文
std::string response;
response += std::to_string(d._x);
response += d._op;
response += std::to_string(d._y);
response += "=";
response += std::to_string(result);
response += "X"; //报文与报文之间的分隔符

//6、将响应报文添加到outbuffer中
item->_outbuffer += response;
if (!item->_outbuffer.empty())
item->_R->EnableReadWrite(item->_sock, true, true); //打开写事件

return 0;

一、数据读取

我们可以将循环调用recv函数读取数据的过程封装成一个recver_helper函数。

  • recver_helper函数要做的就是循环调用recv函数将读取到的数据添加到inbuffer当中。
  • 当recv函数的返回值小于0时同样需要进一步判断错误码,如果错误码为​​EAGAIN​​​或​​EWOULDBLOCK​​​则说明底层数据读取完毕了,如果错误码为​​EINTR​​则说明读取过程被信号中断了,此时还需要继续调用recv函数进行读取,否则就是读取出错了。
  • 当读取出错时直接调用该套接字对应的error_handler回调,最终就会调用到下面将要实现的errorer回调,在我们会在errorer回调当中将该套接字进行关闭。

代码如下:

int recver_helper(int sock, std::string* out)

while (true)
char buffer[128];
ssize_t size = recv(sock, buffer, sizeof(buffer)-1, 0);
if (size < 0)
if (errno == EAGAIN || errno == EWOULDBLOCK) //数据读取完毕
return 0;

else if (errno == EINTR) //被信号中断,继续尝试读取
continue;

else //读取出错
return -1;


else if (size == 0) //对端连接关闭
return -1;

//读取成功
buffer[size] = \\0;
以上是关于Reactor模式的主要内容,如果未能解决你的问题,请参考以下文章

Proactor和Reactor模型

Proactor和Reactor模型

muduo网络库学习笔记:Reactor模式的关键结构

EDA风格与Reactor模式

muduo源代码分析--Reactor模式在muduo中的使用

JAVA设计模式——观察者设计模式和Reactor反应堆设计模式