Linux----Reactor

Posted 4nc414g0n

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux----Reactor相关的知识,希望对你有一定的参考价值。

Reactor

Reactor

  • Reactor设计模式,是一种基于事件驱动的设计模式,常规的I/O多路复用中采用select和poll、epoll等来实现,Reactor是将上述机制进一步封装,通过回调机制实现。我们只需将事件的接口注册到Reactor上,当事件发生之后,会回调注册的接口

  • Reactor模式本质上指的是使用 IO多路复用+非阻塞IO
    (缓冲区满时,如果将IO设置成非阻塞IO,会直接返回EAGAIN或EWOULDBLOCK, 将IO设置成阻塞IO则会一直阻塞,直到数据全都发送成功再返回)

  • 服务端需要与多个客户端通信,它的通信是一对多的关系,所以它需要使用 Reactor 模式。对客户端,它只需要与服务端通信,它的通信是一对一的关系,所以它不需要使用 Reactor 模式

  • 有单/多Reactor单/多线程多种模型


优点:

  • 响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的
  • 编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销
  • 可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源
  • 可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性

①单Reactor单线程


单Reactor单线程模型最简单,acceptor接收新连接,再 dispatch到handler
缺点:只有一个线程,因此事件是顺序处理的,一个线程同时只能做一件事情,事件的优先级得不到保证

单Reactor单线程Epoll ET server端实现过程

类的设计:

Event类(私有):

  • 一个文件描述符对应一个事件对象
  • 指针指向所属的Reactor对象,方便调用
  • 私有接收缓冲区,私有发送缓冲区 (要让程序在写操作上不阻塞,必须要为每个tcp连接配置out_buffer_)
  • 三个回调函数(读 写 异常)

Event类(公有):

  • 构造函数
  • 注册回调 函数RegisterCallback(callback_t recv_ ,callback_t send_, callback_t error_)
  • 析构函数 (注意:这里不能直接close(),应该将close逻辑写在DelEvent内部,参见后面AddEvent代码部分)
    (解释:因为在AddEvent函数中进行了unordered_map的insert操作,insert操作中间会产生临时变量,之后会销毁,在销毁的时候调用析构函数,如果析构函数中有close(sock_),直接释放sock_,之后就会出错)
class Event
       public:
           int sock_;//一个文件描述符对应一个事件对象
           Reactor *r_;//指向所属的Reactor对象
       
           string in_buffer_;//私有接收缓冲区
           string out_buffer_;//私有发送缓冲区

           callback_t recv_callback_;//读回调
           callback_t send_callback_;//写回调
           callback_t error_callback_;//异常回调
       public:
           Event()//初始化
               :sock_(-1),
               r_(nullptr)
           
               recv_callback_=nullptr;
               send_callback_=nullptr;
               error_callback_=nullptr;
           

           void RegisterCallback(callback_t recv_ ,callback_t send_, callback_t error_)//注册回调
           
               recv_callback_=recv_;
               send_callback_=send_;
               error_callback_=error_;
           
           ~Event()//注意这里不能直接close(),应该将close逻辑写在DelEvent内部
           //解释:因为在AddEvent函数中进行了unordered_map的insert操作,insert操作中间会产生临时变量,
           //之后会销毁,在销毁的时候调用析构函数,如果析构函数中有close(sock_),直接释放sock_,之后就会出错
;

Reactor类(私有):

  • epoll对象的fd
  • 一个unordered_map用于建立监听的sock和事件的对应关系
private:
  int epfd_;//epoll对象的fd
  unordered_map<int, Event> events_;

Reactor类(公有):

  • 构造函数
  • Reactor初始化ReactorInit()
void ReactorInit()

   epfd_ = epoll_create(128);
   if(epfd_ < 0)
   
       cerr<<"epoll_create error"<<endl;
       exit(1);
   

  • 添加事件AddEvent(const Event &ev, uint32_t events)
    注意:这里的insert操作中间会产生临时变量,导致销毁时调用析构函数,如果析构函数中有close(sock_),直接释放sock_
void AddEvent(const Event &ev, uint32_t events)

   struct epoll_event epev;
   epev.data.fd = ev.sock_;
   epev.events = events;

   if(epoll_ctl(epfd_, EPOLL_CTL_ADD, ev.sock_, &epev)<0)
   
       cerr<<"epoll_ctl error"<<endl;
       return;
   
   else
       events_.insert(ev.sock_, ev);     
   cout<<"events added"<<endl;  

  • 删除事件DelEvent(int sock)
void DelEvent(int sock)

   unordered_map<int, Event>::iterator iter = events_.find(sock);
   if(iter == events_.end())
       return;//没找到
   else
       epoll_ctl(epfd_, EPOLL_CTL_DEL, sock, nullptr);
       events_.erase(iter);
       close(sock);//sock需要在这里关闭
       cout<<"DelEvent "<<sock<<endl;
   

  • 派发事件DispatchEvent(int timeout)
    过程:创建struct epoll_event数组 -> epoll_wait -> for(int i=0;i<rd_num;i++)循环进行事件就绪判断
    注意:针对两种异常EPOLLERR和EPOLLHUP我们直接交给读写处理,读写会统一交给Errorer
void DispatchEvent(int timeout)

   #define NUM 128
   struct epoll_event epevs[NUM];
   int rd_num = epoll_wait(epfd_, epevs, NUM, timeout);
   for(int i=0;i<rd_num;i++)
   
       int sock = epevs[i].data.fd;
       uint32_t events = epevs[i].events;
       cout<<"sock: "<<sock<<" has data"<<endl;

       //1.事件异常(将所有的异常,全部交给读写处理,读写处理遇到的所有的异常,都会交给Errorer统一处理)
       if(events & EPOLLERR) events |= (EPOLLIN|EPOLLOUT);
       //2.对端关闭链接(将所有的异常,全部交给读写处理,读写处理遇到的所有的异常,都会交给Errorer统一处理)
       if(events & EPOLLHUP ) events |= (EPOLLIN|EPOLLOUT);
       //3.读事件就绪
       if((IsExist(sock))&&(events & EPOLLIN))
       
           if(events_[sock].recv_callback_)//recv_callback_不为nullptr有回调
           
               events_[sock].recv_callback_(events_[sock]);//回调参数Event &
           
       
       //4.写事件就绪
       if((IsExist(sock)) && (events & EPOLLOUT))
       
           //注意需要判断是否存在次sock (可能发生读错误Errorer->DelEvent->进行清楚events_[sock]找不到,出错)
           if(events_[sock].send_callback_)//send_callback_不为nullptr有回调
           
               events_[sock].send_callback_(events_[sock]);//回调参数Event &
           
       
   

  • 使能读写(修改事件)EnableReadWrite(int sock, bool readable, bool writeable)
void EnableReadWrite(int sock, bool readable, bool writeable)

   struct epoll_event ev;
   ev.events = (EPOLLET|(readable? EPOLLIN:0)|(writeable?EPOLLOUT:0));
   ev.data.fd = sock;
   if(0 == epoll_ctl(epfd_, EPOLL_CTL_MOD, sock,&ev))//成功返回0 (修改目标sock的event)
   
       cout<<"sock: "<<sock<<" 修改成功"<<endl;
   

  • 查找某sock是否存在IsExist(int sock)
bool IsExist(int sock)

   unordered_map<int, Event>::iterator iter = events_.find(sock);
   return events_.end()==iter? false:true;

  • 析构关闭epoll对象的fd
~Reactor()

   if(epfd_ >= 0) close(epfd_);

callback函数(??序列化反序列化 && 解包粘包问题)

Recver:

  • 帮助函数static int RecvHelper(int sock, string *out)
    注意:不断的读取recv(sock, &buffer, sizeof(buffer)-1, 0)这里的0是设置非阻塞读取,直到被中断 / 本轮读完 / 读取出错
static int RecvHelper(int sock, string *out)//(注意循环读取)

   while(true)
   
       char buffer[1024];
       ssize_t s = recv(sock, &buffer, sizeof(buffer)-1, 0);//0设置非阻塞读取
       if(s>0)
       
           buffer[s]=0;
           (*out)+=buffer;
       
       else if(s<0)
           if(errno == EINTR)//被中断
               continue;
           else if(errno == EAGAIN || errno == EWOULDBLOCK)
               return 0;//本轮读完返回0
           else 
               return -1;//读取出错返回-1
       
       else
           return -1;//出错返回-1
   

  • 主体void Recver(Event &event)
    注意解包(反序列化的过程中解决粘包问题,例如:分隔符为\\3,…)

如何解决拆包的粘包(以TCP报文为例):(参考:Netty之解决TCP粘包拆包(设置定长消息)

  1. 使用带消息头的协议,消息头存储消息开始标识和消息长度,服务端获取到消息头时,解析出消息长度,然后后后读取该消息内容,如:0000000036“type”:“message”,“content”:“hello”
  2. 设置定长消息,服务端每次读取即定长度为一条消息。报文大小固定长度,不够空格补全,发送和接收方遵循相同的约定,这样即使粘包了通过接收方编程实现获取定长报文也能区分
  3. 设置消息边界,服务端按消息边界从网络流中读取内容。比如:“type”:“message”,“content”:“hello”\\n

注意:多路转接中,一般EPOLLIN是常设的, 而EPOLLOUT是按需设置的

void Recver(Event &event)

   //本轮读取完毕return 0
   if(-1 == RecvHelper(event.sock_, &(event.in_buffer_)))//负责读取 (读取出错返回-1)
   
       if(event.error_callback_) event.error_callback_(event);
       return;
   
   //...
   /* 数据分析与处理进行反序列化和序列化,假设得到一个结果是response */
   //...
   //发送的核心:不是这里调用send,只要将报文添加到outbuffer中即可
   event.out_buffer_ += response;
   //多路转接中,一般EPOLLIN是常设的, 而EPOLLOUT是按需设置的
   (event.r_)->EnableReadWrite(event.sock_, true, true);

Sender:

  • 帮助函数static int SenderHelper(int sock, string &recv_buffer)

注意错误写法:(这种写法仍是阻塞IO,未全部读完就阻塞)

while(true)

   ssize_t s = send(sock, start+total, size-total, MSG_DONTWAIT); //MSG_DONTWAIT临时将sockfd设为非阻塞模式
   if(s<0 && errno == EWOULDBLOCK)
   	continue;
   else if(s<0)
       break;
   if(total == size)
       return;//成功返回

正确写法:

static int SenderHelper(int sock, string &recv_buffer)

   int total = 0; //目前我们已经累计发送了所少
   const char *start = recv_buffer.c_str();
   int size = recv_buffer.size();
   while(true)
   
       ssize_t s = send(sock, start+total, size-total, 0); 
       //size-total需要发送的字节数(不一定一次发送完,缓冲区填满),flag(0)
       // 3. 本轮缓冲区足够大,引起我们把所有的数据已经全部发送完毕
       if(s>0)
       
           total+=s;
           if(total==size)
           
               recv_buffer.clear();//清空缓冲区
               return 1;
           
       
       else
           if(errno == EINTR)
               continue;
           else if(errno==EAGAIN || errno == EWOULDBLOCK)
           
               //缓冲区打满代表本轮发送结束
               //先清空这部分的缓冲区
               recv_buffer.erase(0, total);
               return 0;
           
           else
               //出错
               return -1;
           
       
   

  • 主体void Sender(Event &event)
    注意1. 缓冲区满进行下一轮读取,需要继续将EPOLLOUT设为关心2. 发生完毕,需要将写事件关闭
void Sender(Event &event)

   int ret = SenderHelper(event.sock_, event.out_buffer_);
   if(-1 == ret)//出错
       if(event.error_callback_) event.error_callback_(event);
   else if(0 == ret)//缓冲区满,可以继续下一轮
       (event.r_)->EnableReadWrite(event.sock_ ,true ,true);
   else if(1 == ret)//发送完毕,写事件关闭
       (event.r_)->EnableReadWrite(event.sock_ ,true ,false);
   cout<<"Call Sender"<<endl;

Errorer

  • 主体void Errorer(Event &event)
    注意:所有异常最终都交给Errorer处理进行DelEvent
void Errorer(Event &event)

   cout<<"Call Errorer"<<endl;
   (event.r_)->DelEvent(event.sock_);

Acceptor函数

listen_sock_只需要监听读即只需要Recver,所以只需要注册E.RegisterCallback(Accepter, nullptr, nullptr)


void Accepter(Event &event)
连接到来就注册三个回调函数注意要将这些连接设置为非阻塞Sock::SetNonBlock(sock)

void Accepter(Event &event)

   cout<<"Call Accepter"<<endl;
   while(1)// ET工作模式下必须循环读取 (同一时间内可能有很多链接到来)
   
       //accept
       struct sockaddr_in peer;
       socklen_t len = sizeof(peer);
       int sock = accept(event.sock_, (struct sockaddr*)&peer, &len);
       if(sock>0)
       
           //0. 设置fd为非阻塞
           Sock::SetNonBlock(sock);//ET模式下所有IO必须是非阻塞的
           //1.构建新的与sock对应的Event对象
           Event ev;//多个Event对象
           ev.sock_ = sock;//多个sock
           ev.r_ = event.r_;//只有一个Reactor对象

           ev.RegisterCallback(Recver, Sender, Errorer);
           //添加事件 (设为常读(ET))
           (ev.r_)->AddEvent(ev, EPOLLIN | EPOLLET);
        
       //被信号中断(不代表没有连接了)
       else if(errno == EINTR)
       
           continue;
       
       //底层无连接直接break
       else if(errno == EAGAIN || errno == EWOULDBLOCK)
       
           break;
       
       else
       
           cerr<<"accept error"<<endl;
           continue;
       

   

主函数流程

流程框图:

代码如下:
注意解决断开连接TIMEWAIT状态的问题setsockopt()

void Usage(string proc)

   cerr<<"Usage : "<<"\\n\\t"<<proc<<" local_port"<<endl;

int main(int argc, char *argv[])//argv[1]: 端口(server端只需要端口) 

   if(argc != 2)
   
       Usage(argv[0]);
       return 1;
   
   //创建Reactor
   ns_reactor::Reactor* R = new ns_reactor::Reactor();
   R->ReactorInit();

   //创建Sock
   ns_sock::Sock* S = new ns_sock::Sock();
   int listen_sock_ = S->SockInit();
   S->SetNonBlock(listen_sock_);//设为非阻塞(ET模式下所有IO均为非阻塞)
   S->SockBind(listen_sock_, (u_int16_t)atoi(argv[1]));//端口号
   S->SockListen(listen_sock_);

   //创建Event
   ns_reactor::Event E;
   E.sock_ = listen_sock_;
   E.r_ = R;

   //Accepter链接管理器
   E.RegisterCallback(Accepter, nullptr, nullptr);//listen_sock_只需要监听读即recv_
   
   //将E注册进R
   R->AddEvent(E, EPOLLIN|EPOLLET);//ET设为ET模式

   //进入事件派发逻辑, 服务器启动
   int timeout=1000;
   while(1)
   
       R->DispatchEvent(timeout);
   
   return 0;
 

②单Reactor多线程(ThreadPool)


优点:使用线程池来计算,这会充分的利用多核CPU
注意:由于结果次序不一定,设计协议时带一个id标示

③多Reactor单线程(主从Reactor)


参考:Linux下多线程服务器Reactor模式优缺点
main Reactor负责accept连接, 然后把该连接挂在某个sub Reactor中(可以采用round-robin或者随机方法),这样该连接的所有操作都在哪个sub Reactor所处的线程中完成。多个连接可能被分配到多个线程中

  • 当 Acceptor 处理连接事件后,MainReactor 通过accept获取新的连接,并将连接注册到SubReactor
  • subreactor 将连接加入到连接队列进行监听,并创建handler进行各种事件处理
  • 当有新事件发生时, subreactor 就会调用对应的handler处理…

④多Reactor多线程(主从Reactor+ThreadPool)

同理结合一下
既使用多个 reactors 来处理 IO,又使用线程池来处理计算。此模式适适合既有突发IO(利用Multiple Reactor分担),又有突发计算的应用(利用线程池把一个连接上的计算任务分配给多个线程)

以上是关于Linux----Reactor的主要内容,如果未能解决你的问题,请参考以下文章

如何更新 Ember 模板中辅助主体内的变量

服务主体如何登录到我的 Azure 应用服务?

008-测试对象

每次渲染正文时都会调用 SwiftUI Picker onReceive()

干货-vue 中使用 rxjs 进行非父子组件中传值

01大数据概论