Linux----Reactor
Posted 4nc414g0n
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux----Reactor相关的知识,希望对你有一定的参考价值。
Reactor
Reactor
Reactor设计模式,是一种基于事件驱动的设计模式,常规的I/O多路复用中采用select和poll、epoll等来实现,Reactor是将上述机制进一步封装,
通过回调机制实现
。我们只需将事件的接口注册到Reactor上,当事件发生之后,会回调注册的接口
Reactor模式本质上指的是使用
IO多路复用
+非阻塞IO
(缓冲区满时,如果将IO设置成非阻塞IO,会直接返回EAGAIN或EWOULDBLOCK, 将IO设置成阻塞IO则会一直阻塞,直到数据全都发送成功再返回)
服务端
需要与多个客户端通信,它的通信是一对多的关系,所以它需要使用 Reactor 模式
。对客户端,它只需要与服务端通信,它的通信是一对一的关系,所以它不需要使用 Reactor 模式有单/多Reactor单/多线程多种模型
优点:
- 响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的
- 编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销
- 可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源
- 可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性
①单Reactor单线程
单Reactor单线程模型最简单,acceptor接收新连接,再 dispatch到handler
缺点
:只有一个线程,因此事件是顺序处理的,一个线程同时只能做一件事情,事件的优先级得不到保证
单Reactor单线程Epoll ET server端实现过程
类的设计:
Event类(私有):
- 一个文件描述符对应一个事件对象
- 指针指向所属的Reactor对象,方便调用
- 私有接收缓冲区,私有发送缓冲区 (
要让程序在写操作上不阻塞,必须要为每个tcp连接配置out_buffer_
)- 三个回调函数(读 写 异常)
Event类(公有):
- 构造函数
注册回调
函数RegisterCallback(callback_t recv_ ,callback_t send_, callback_t error_)
- 析构函数 (
注意:这里不能直接close(),应该将close逻辑写在DelEvent内部,参见后面AddEvent代码部分
)
(解释
:因为在AddEvent函数中进行了unordered_map的insert操作,insert操作中间会产生临时变量,之后会销毁,在销毁的时候调用析构函数,如果析构函数中有close(sock_),直接释放sock_,之后就会出错)class Event public: int sock_;//一个文件描述符对应一个事件对象 Reactor *r_;//指向所属的Reactor对象 string in_buffer_;//私有接收缓冲区 string out_buffer_;//私有发送缓冲区 callback_t recv_callback_;//读回调 callback_t send_callback_;//写回调 callback_t error_callback_;//异常回调 public: Event()//初始化 :sock_(-1), r_(nullptr) recv_callback_=nullptr; send_callback_=nullptr; error_callback_=nullptr; void RegisterCallback(callback_t recv_ ,callback_t send_, callback_t error_)//注册回调 recv_callback_=recv_; send_callback_=send_; error_callback_=error_; ~Event()//注意这里不能直接close(),应该将close逻辑写在DelEvent内部 //解释:因为在AddEvent函数中进行了unordered_map的insert操作,insert操作中间会产生临时变量, //之后会销毁,在销毁的时候调用析构函数,如果析构函数中有close(sock_),直接释放sock_,之后就会出错 ;
Reactor类(私有):
- epoll对象的fd
- 一个unordered_map用于建立监听的sock和事件的对应关系
private: int epfd_;//epoll对象的fd unordered_map<int, Event> events_;
Reactor类(公有):
- 构造函数
- Reactor初始化ReactorInit()
void ReactorInit() epfd_ = epoll_create(128); if(epfd_ < 0) cerr<<"epoll_create error"<<endl; exit(1);
- 添加事件AddEvent(const Event &ev, uint32_t events)
注意:这里的insert操作中间会产生临时变量,导致销毁时调用析构函数,如果析构函数中有close(sock_),直接释放sock_
void AddEvent(const Event &ev, uint32_t events) struct epoll_event epev; epev.data.fd = ev.sock_; epev.events = events; if(epoll_ctl(epfd_, EPOLL_CTL_ADD, ev.sock_, &epev)<0) cerr<<"epoll_ctl error"<<endl; return; else events_.insert(ev.sock_, ev); cout<<"events added"<<endl;
- 删除事件DelEvent(int sock)
void DelEvent(int sock) unordered_map<int, Event>::iterator iter = events_.find(sock); if(iter == events_.end()) return;//没找到 else epoll_ctl(epfd_, EPOLL_CTL_DEL, sock, nullptr); events_.erase(iter); close(sock);//sock需要在这里关闭 cout<<"DelEvent "<<sock<<endl;
- 派发事件DispatchEvent(int timeout)
过程:创建struct epoll_event数组
->epoll_wait
->for(int i=0;i<rd_num;i++)循环进行事件就绪判断
注意:针对两种异常EPOLLERR和EPOLLHUP我们直接交给读写处理,读写会统一交给Errorer
void DispatchEvent(int timeout) #define NUM 128 struct epoll_event epevs[NUM]; int rd_num = epoll_wait(epfd_, epevs, NUM, timeout); for(int i=0;i<rd_num;i++) int sock = epevs[i].data.fd; uint32_t events = epevs[i].events; cout<<"sock: "<<sock<<" has data"<<endl; //1.事件异常(将所有的异常,全部交给读写处理,读写处理遇到的所有的异常,都会交给Errorer统一处理) if(events & EPOLLERR) events |= (EPOLLIN|EPOLLOUT); //2.对端关闭链接(将所有的异常,全部交给读写处理,读写处理遇到的所有的异常,都会交给Errorer统一处理) if(events & EPOLLHUP ) events |= (EPOLLIN|EPOLLOUT); //3.读事件就绪 if((IsExist(sock))&&(events & EPOLLIN)) if(events_[sock].recv_callback_)//recv_callback_不为nullptr有回调 events_[sock].recv_callback_(events_[sock]);//回调参数Event & //4.写事件就绪 if((IsExist(sock)) && (events & EPOLLOUT)) //注意需要判断是否存在次sock (可能发生读错误Errorer->DelEvent->进行清楚events_[sock]找不到,出错) if(events_[sock].send_callback_)//send_callback_不为nullptr有回调 events_[sock].send_callback_(events_[sock]);//回调参数Event &
- 使能读写(修改事件)EnableReadWrite(int sock, bool readable, bool writeable)
void EnableReadWrite(int sock, bool readable, bool writeable) struct epoll_event ev; ev.events = (EPOLLET|(readable? EPOLLIN:0)|(writeable?EPOLLOUT:0)); ev.data.fd = sock; if(0 == epoll_ctl(epfd_, EPOLL_CTL_MOD, sock,&ev))//成功返回0 (修改目标sock的event) cout<<"sock: "<<sock<<" 修改成功"<<endl;
- 查找某sock是否存在IsExist(int sock)
bool IsExist(int sock) unordered_map<int, Event>::iterator iter = events_.find(sock); return events_.end()==iter? false:true;
- 析构关闭epoll对象的fd
~Reactor() if(epfd_ >= 0) close(epfd_);
callback函数(??序列化反序列化 && 解包粘包问题)
Recver:
帮助函数static int RecvHelper(int sock, string *out)
注意
:不断的读取recv(sock, &buffer, sizeof(buffer)-1, 0)这里的0是设置非阻塞读取
,直到被中断 / 本轮读完 / 读取出错static int RecvHelper(int sock, string *out)//(注意循环读取) while(true) char buffer[1024]; ssize_t s = recv(sock, &buffer, sizeof(buffer)-1, 0);//0设置非阻塞读取 if(s>0) buffer[s]=0; (*out)+=buffer; else if(s<0) if(errno == EINTR)//被中断 continue; else if(errno == EAGAIN || errno == EWOULDBLOCK) return 0;//本轮读完返回0 else return -1;//读取出错返回-1 else return -1;//出错返回-1
主体void Recver(Event &event)
注意解包(反序列化的过程中解决粘包问题,例如:分隔符为\\3,…)
如何解决拆包的粘包(以TCP报文为例)
:(参考:Netty之解决TCP粘包拆包(设置定长消息))
- 使用带消息头的协议,消息头存储消息开始标识和消息长度,服务端获取到消息头时,解析出消息长度,然后后后读取该消息内容,如:0000000036“type”:“message”,“content”:“hello”
- 设置定长消息,服务端每次读取即定长度为一条消息。报文大小固定长度,不够空格补全,发送和接收方遵循相同的约定,这样即使粘包了通过接收方编程实现获取定长报文也能区分
- 设置消息边界,服务端按消息边界从网络流中读取内容。比如:“type”:“message”,“content”:“hello”\\n
注意:
多路转接中,一般EPOLLIN是常设的, 而EPOLLOUT是按需设置的
void Recver(Event &event) //本轮读取完毕return 0 if(-1 == RecvHelper(event.sock_, &(event.in_buffer_)))//负责读取 (读取出错返回-1) if(event.error_callback_) event.error_callback_(event); return; //... /* 数据分析与处理进行反序列化和序列化,假设得到一个结果是response */ //... //发送的核心:不是这里调用send,只要将报文添加到outbuffer中即可 event.out_buffer_ += response; //多路转接中,一般EPOLLIN是常设的, 而EPOLLOUT是按需设置的 (event.r_)->EnableReadWrite(event.sock_, true, true);
Sender:
帮助函数static int SenderHelper(int sock, string &recv_buffer)
注意错误写法:
(这种写法仍是阻塞IO,未全部读完就阻塞)while(true) ssize_t s = send(sock, start+total, size-total, MSG_DONTWAIT); //MSG_DONTWAIT临时将sockfd设为非阻塞模式 if(s<0 && errno == EWOULDBLOCK) continue; else if(s<0) break; if(total == size) return;//成功返回
正确写法:
static int SenderHelper(int sock, string &recv_buffer) int total = 0; //目前我们已经累计发送了所少 const char *start = recv_buffer.c_str(); int size = recv_buffer.size(); while(true) ssize_t s = send(sock, start+total, size-total, 0); //size-total需要发送的字节数(不一定一次发送完,缓冲区填满),flag(0) // 3. 本轮缓冲区足够大,引起我们把所有的数据已经全部发送完毕 if(s>0) total+=s; if(total==size) recv_buffer.clear();//清空缓冲区 return 1; else if(errno == EINTR) continue; else if(errno==EAGAIN || errno == EWOULDBLOCK) //缓冲区打满代表本轮发送结束 //先清空这部分的缓冲区 recv_buffer.erase(0, total); return 0; else //出错 return -1;
主体void Sender(Event &event)
注意
:1. 缓冲区满进行下一轮读取,需要继续将EPOLLOUT设为关心
,2. 发生完毕,需要将写事件关闭
void Sender(Event &event) int ret = SenderHelper(event.sock_, event.out_buffer_); if(-1 == ret)//出错 if(event.error_callback_) event.error_callback_(event); else if(0 == ret)//缓冲区满,可以继续下一轮 (event.r_)->EnableReadWrite(event.sock_ ,true ,true); else if(1 == ret)//发送完毕,写事件关闭 (event.r_)->EnableReadWrite(event.sock_ ,true ,false); cout<<"Call Sender"<<endl;
Errorer
主体void Errorer(Event &event)
注意:所有异常最终都交给Errorer处理进行DelEvent
void Errorer(Event &event) cout<<"Call Errorer"<<endl; (event.r_)->DelEvent(event.sock_);
Acceptor函数
listen_sock_只需要监听读即只需要Recver,所以只需要注册E.RegisterCallback(Accepter, nullptr, nullptr)
void Accepter(Event &event)
连接到来就注册三个回调函数
,注意要将这些连接设置为非阻塞Sock::SetNonBlock(sock)
void Accepter(Event &event) cout<<"Call Accepter"<<endl; while(1)// ET工作模式下必须循环读取 (同一时间内可能有很多链接到来) //accept struct sockaddr_in peer; socklen_t len = sizeof(peer); int sock = accept(event.sock_, (struct sockaddr*)&peer, &len); if(sock>0) //0. 设置fd为非阻塞 Sock::SetNonBlock(sock);//ET模式下所有IO必须是非阻塞的 //1.构建新的与sock对应的Event对象 Event ev;//多个Event对象 ev.sock_ = sock;//多个sock ev.r_ = event.r_;//只有一个Reactor对象 ev.RegisterCallback(Recver, Sender, Errorer); //添加事件 (设为常读(ET)) (ev.r_)->AddEvent(ev, EPOLLIN | EPOLLET); //被信号中断(不代表没有连接了) else if(errno == EINTR) continue; //底层无连接直接break else if(errno == EAGAIN || errno == EWOULDBLOCK) break; else cerr<<"accept error"<<endl; continue;
主函数流程
流程框图:
代码如下:
注意解决断开连接TIMEWAIT状态的问题
setsockopt()void Usage(string proc) cerr<<"Usage : "<<"\\n\\t"<<proc<<" local_port"<<endl; int main(int argc, char *argv[])//argv[1]: 端口(server端只需要端口) if(argc != 2) Usage(argv[0]); return 1; //创建Reactor ns_reactor::Reactor* R = new ns_reactor::Reactor(); R->ReactorInit(); //创建Sock ns_sock::Sock* S = new ns_sock::Sock(); int listen_sock_ = S->SockInit(); S->SetNonBlock(listen_sock_);//设为非阻塞(ET模式下所有IO均为非阻塞) S->SockBind(listen_sock_, (u_int16_t)atoi(argv[1]));//端口号 S->SockListen(listen_sock_); //创建Event ns_reactor::Event E; E.sock_ = listen_sock_; E.r_ = R; //Accepter链接管理器 E.RegisterCallback(Accepter, nullptr, nullptr);//listen_sock_只需要监听读即recv_ //将E注册进R R->AddEvent(E, EPOLLIN|EPOLLET);//ET设为ET模式 //进入事件派发逻辑, 服务器启动 int timeout=1000; while(1) R->DispatchEvent(timeout); return 0;
②单Reactor多线程(ThreadPool)
优点
:使用线程池来计算,这会充分的利用多核CPU
注意
:由于结果次序不一定,设计协议时带一个id标示
③多Reactor单线程(主从Reactor)
参考:Linux下多线程服务器Reactor模式优缺点
main Reactor负责accept连接, 然后把该连接挂在某个sub Reactor中(可以采用round-robin或者随机方法),这样该连接的所有操作都在哪个sub Reactor所处的线程中完成。多个连接可能被分配到多个线程中
- 当 Acceptor 处理连接事件后,MainReactor 通过accept获取新的连接,并将连接注册到SubReactor
- subreactor 将连接加入到连接队列进行监听,并创建handler进行各种事件处理
- 当有新事件发生时, subreactor 就会调用对应的handler处理…
④多Reactor多线程(主从Reactor+ThreadPool)
同理结合一下
既使用多个 reactors 来处理 IO,又使用线程池来处理计算。此模式适适合既有突发IO(利用Multiple Reactor分担),又有突发计算的应用(利用线程池把一个连接上的计算任务分配给多个线程)
以上是关于Linux----Reactor的主要内容,如果未能解决你的问题,请参考以下文章