多路转接reactor epoll ET 简单的英译汉服务
Posted 米兰的小铁匠2333
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多路转接reactor epoll ET 简单的英译汉服务相关的知识,希望对你有一定的参考价值。
文章目录
- 记得点赞哦
- 先举一个简单的例子吧
- 💕工作流程(必看)
- 每一个文件描述符 sock 对应的 管理链接(服务)Connection
- main.cc
- Protocol.hpp
- Tcpserver.hpp
- Log.hpp
- Epoller.hpp
- service.hpp
- Sock.hpp
- Util.hpp
- 操作手册
记得点赞哦
点个赞呗 点个赞呗 点个赞呗
先举一个简单的例子吧
可以类比一下打地鼠的例子
地鼠出来了 自己就知道要 做什么
reactor 在等待事件的到来,如果有事件那就调用相应的函数 来处理 。
在这个过程中需要 为每一个文件描述符分配一个 管理链接(服务)的 对象 就像去办理业务 每个人有一个对应的工作人员为自己提供服务一样,
但是具体提供服务 还是在营业厅里面用营业厅提供好的工具来提供服务
这个connection对象要调用Tcpserver提供好的函数 换句话来说 connection 用来分别是具体为哪个客户端提供服务
当然Tcpserver也需要一个 unordered_map 来存储 文件描述符和对应的管理链接(服务)的对象
💕工作流程(必看)
- main.cc 先创建一个 Tcpserver 对象
- Tcpserver构造函数把需要的数据先初始化好
创建listensock_ 套接字 绑定 监听
调用 AddConnection 函数(因为是ET模式将listensock_设置为非阻塞
)
为listensock_ 创建一个Connection对象这个Connection对象中的 recver_ 函数是Accepter 因为listensock_ 的EPOLLIN的工作是 accept
将listensock_ 和 自己对应的Connection对象指针 添加到TcpServer unordered_map 中
在这里插入代码片
TcpServer(callback_t cb, uint16_t port = 8080) : epfd_(-1), listensock_(-1), cb_(cb)
recvs_ = new struct epoll_event[rec_num];
// 网络功能
listensock_ = Sock::Socket();
Util::SetNonBlock(listensock_); // 设置成非阻塞
Sock::Bind(listensock_, port);
Sock::Listen(listensock_);
epfd_ = Epoller::CreateEpoller();
AddConnection(listensock_, EPOLLIN | EPOLLET,
std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
int Accepter(Connection *con)
while (true)
std::string clientip;
uint16_t clientport = 0;
std::cout << "clientip" << clientip << std::endl;
std::cout << "client port" << clientport << std::endl;
int sockfd = Sock::Accept(con->sock_, &clientip, &clientport);
std::cout << "clientip" << clientip << std::endl;
std::cout << "client port" << clientport << std::endl;
if (sockfd < 0)
if (errno == EINTR)
std::cout << "继续链接" << std::endl;
continue;
else if (errno == EAGAIN || errno == EWOULDBLOCK) // 没有可以读取的数据了
std::cout << "没链接要溜了" << std::endl;
break;
else
logMessage(WARNING, "accept error");
return -1;
logMessage(DEBUG, "get a new link: %d", sockfd);
// 注意:默认我们只设置了让epoll帮我们关心读事件,没有关心写事件
// 为什么没有关注写事件:因为最开始的时候,写空间一定是就绪的!
// 运行中可能会存在条件不满足 -- 写空间被写满了
AddConnection(sockfd, EPOLLIN | EPOLLET,
std::bind(&TcpServer::TcpRecver, this, std::placeholders::_1),
std::bind(&TcpServer::TcpSender, this, std::placeholders::_1),
std::bind(&TcpServer::TcpExcepter, this, std::placeholders::_1));
std::cout << " accepting" << std::endl;
return 0;
- 开始 RUN 执行Epoller::LoopOnce(epfd_, recvs_, rec_num)函数 来获取就绪队列的大小
- 开始遍历就绪队列
- 如果有新链接到来 那么 就调用AddConnection 函数(
因为是ET模式将lsock设置为非阻塞
) 并且把Tcpserver 写好的 recver send except 函数传给新创建的connection对象 - 如果revent 是 EPOLLHUP 或者EPOLLERR 那就把 读事件和写事件都打开 然后 运行到 recver函数的时候 会调用 excepter 函数
- 如果普通套接字有读事件就绪 那就调用recver 函数 ,如果出错就会调用excepter函数,读取完毕之后就 执行数据解析 数据解析完毕之后就执行服务 服务执行完之后就对要发送的数据序列化 将序列化好的数据 存到 outbuffer里如果outbuffer不为空 将这个connection对象对应的套接字的EPOLLOUT也打开,如果为空那就 关闭EPOLLOUT
- 如果普通套接字有写事件就绪 那就调用sender函数,如果出错就会调用excepter函数
if (revent & EPOLLIN)
if (IsExits(sock) && connections_[sock]->recver_) // 在unordered map中才执行后续操作
std::cout << "读取一次" << std::endl;
int n = connections_[sock]->recver_(connections_[sock]);
std::cout << "recver 返回值" << n << std::endl;
void AddConnection(int sockfd, uint32_t event, func_t recver, func_t sender, func_t excepter)
if (event & EPOLLET)
Util::SetNonBlock(sockfd);
Epoller::AddEvent(epfd_, sockfd, event);
Connection *con = new Connection(sockfd, this);
con->SetRecver(recver);
con->SetSendver(sender);
con->SetExcepter(excepter);
connections_.emplace(sockfd, con);
logMessage(DEBUG, "添加新链接到connections成功: %d", sockfd);
std::cout << sockfd << " connection: " << connections_[sockfd] << std::endl;
每一个文件描述符 sock 对应的 管理链接(服务)Connection
-
using func_t = std::function<int(Connection *)>; int sock_;
仿函数类型 参数是 Connection * 返回值是int
是 回调函数func_t recver_;
用来处理 EPOLLIN 事件 比如 有新链接到来 或者 客户端发送消息 服务端要读取func_t sender_;
用来处理 EPOLLOUT 事件 比如 服务端要给客户端发送消息func_t excepter_;
用来处理异常事件
-
TcpServer *R_;
指向 给自己提供服务的 Tcpserver -
std::string inbuffer_;
服务端把从 sock 读取的数据放在这个inbuffer 里面 -
std::string outbuffer_;
服务端把要发送给客户端的数据存到这个outbuffer里 然后发送
using func_t = std::function<int(Connection *)>;
using callback_t = std::function<int(Connection *, std::string &)>;
class Connection
public:
Connection(int sock, TcpServer *r) : sock_(sock), R_(r)
void SetRecver(func_t recver)
std::cout << "设置recver成功" << std::endl;
recver_ = recver;
void SetSendver(func_t sender)
std::cout << "设置sender成功" << std::endl;
sender_ = sender;
void SetExcepter(func_t excepter)
std::cout << "设置excepter成功" << std::endl;
excepter_ = excepter;
;
~Connection()
int sock_;
TcpServer *R_;
std::string inbuffer_;
std::string outbuffer_;
func_t recver_;
func_t sender_;
func_t excepter_;
; // 每一个文件描述符都有对应的链接服务
main.cc
完整代码
#include"TcpServer.hpp"
#include<memory>
using namespace std;
#include"service.hpp"
int BeginHandler(Connection *conn, std::string &message, service_t service)
// 我们能保证,message一定是一个完整的报文,因为我们已经对它进行了解码
Request req;
// 反序列化,进行处理的问题
if (!Parser(message, &req))
// 写回错误消息
return -1;
// 可以直接关闭连接
// conn->excepter_(conn);
// 业务逻辑
Response resp = service(req);
//std::cout << req.x << " " << req.op << " " << req.y << std::endl;
//std::cout << resp.code << " " << resp.result << std::endl;
// 序列化
std::string sendstr;
Serialize(resp, &sendstr);
// 处理完毕的结果,发送回给client
conn->outbuffer_ += sendstr;
conn->sender_(conn);
if(conn->outbuffer_.empty()) conn->R_->EnableReadWrite(conn->sock_, true, false);
else conn->R_->EnableReadWrite(conn->sock_, true, true);
// conn->R_->EnableReadWrite(conn->sock_, true, true);
// conn->sender_()
// 能不能直接调用send方法呢?
// 谈谈多路转接的发送问题?
std::cout << "这里就是上次的业务逻辑啦 --- end" << std::endl;
// 如果我们处理完了结果了,如何返回呢???
// conn->outbuffer_ += result;
// epoll
return 0;
// 1 + 1X2 + 3X5 + 6X8 -> 1 + 1
int HandlerRequest(Connection *conn, std::string &message)
return BeginHandler(conn, message, Translator);
int main()
unique_ptr<TcpServer> svr(new TcpServer(HandlerRequest,8080));
svr->Run();
return 0;
Protocol.hpp
完整代码
#pragma once
#include <iostream>
#include <vector>
#include <cstring>
#include <string>
#include <cstdio>
#include <boost/algorithm/string/case_conv.hpp>
#define SEP ' ' // 单词的话就应该拿空格做分割
#define SEP_LEN sizeof(SEP)
#define CRLF "\\r\\n"
#define CRLF_LEN strlen(CRLF) // 坑:sizeof(CRLF)
#define SPACE " "
#define SPACE_LEN strlen(SPACE)
// bbb cc
// recv 中读取的数据存到了 inbuffer 里面然后进行解析
// 并且存到 result 中
// 这个服务是英汉互译的 所以得拿空格做分割符
// 把数据全部转换成小写
void PackageSplit(std::string &inbuffer, std::vector<std::string> *result)
while (true)
std::size_t pos_n = inbuffer.find('\\n');
std::cout << "pos_n xxx: " << pos_n << std::endl;
if (pos_n != std::string::npos && pos_n < inbuffer.size())
inbuffer.erase(inbuffer.begin() + pos_n);
std::cout << "走到这一步开始找 r " << pos_n << std::endl;
std::size_t pos_r = inbuffer.find('\\r');
std::cout << "pos_r: " << pos_r << std::endl;
if (pos_r != std::string::npos && pos_r < inbuffer.size())
inbuffer.erase(inbuffer.begin() + pos_r);
std::size_t pos = inbuffer.find(SEP);//去掉 \\r \\n 之后再去空格
if (pos == std::string::npos)
break;
std::string word;
// if (inbuffer[0] == '\\n')
// word = inbuffer.substr(1, pos);
// else
word = inbuffer.substr(0, pos);
boost::to_lower(word);
std::cout << "转换成小写 word: " << word << std::endl;
result->emplace_back(word);
inbuffer.erase(0, pos + SEP_LEN);
struct Request // 请求
std::string word;
;
struct Response // 响应
int code;
std::string result;
;
bool Parser(std::string &in, Request *req) // 将请求转换成指定的格式
req->word = in;
return true;
void Serialize(const Response &resp, std::string *out) // 将响应 转换为指定的字符串格式
// "exitCode_ result_"
std::string ec = std::to_string(resp.code);
std::string res = resp.result;
*out = ec;
*out += SPACE;
*out += res;
*out += CRLF;
Tcpserver.hpp
需要注意的小细节
注意看注释部分↓
完整代码
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <cerrno>
#include <unordered_map>
#include <functional>
#include "Sock.hpp"
#include "Epoller.hpp"
#include "Log.hpp"
#include "Util.hpp"
#include "Protocol.hpp"
class Connection;
class TcpServer;
using func_t = std::function<int(Connection *)>;
using callback_t = std::function<int(Connection *, std::string &)>;
class Connection
public:
Connection(int sock, TcpServer *r) : sock_(sock), R_(r)
void SetRecver(func_t recver)
std::cout << "设置recver成功" << std::endl;
recver_ = recver;
void SetSendver(func_t sender)
std::cout << "设置sender成功" << std::endl;
sender_ = sender;
void SetExcepter(func_t excepter)
std::cout << "设置excepter成功" << std::endl;
excepter_ = excepter;
;
~Connection()
int sock_;
TcpServer *R_;
std::string inbuffer_;
std::string outbuffer_;
func_t recver_;
func_t sender_;
func_t excepter_;
; // 每一个文件描述符都有对应的链接服务
class TcpServer
public:
TcpServer(callback_t cb, uint16_t port = 8080) : epfd_(-1), listensock_(-1), cb_(cb)
recvs_ = new struct epoll_event[rec_num];
// 网络功能
listensock_ = Sock::Socket();
Util::SetNonBlock(listensock_); // 设置成非阻塞
Sock::Bind(listensock_, port);
Sock::Listen(listensock_);
epfd_ = Epoller::CreateEpoller();
AddConnection(listensock_, EPOLLIN | EPOLLET,
std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
void AddConnection(int sockfd, uint32_t event目录
- 💂 个人主页:努力学习的少年
- 🤟 版权: 本文由【努力学习的少年】原创、在CSDN首发、需要转载请联系博主
- 💬 如果文章对你有帮助、欢迎关注、点赞、收藏(一键三连)和订阅专栏哦
一. epoll简介
epoll的功能一样跟select和poll一样,都是用来检测文件描述符中的事件是否就绪,当有事件就绪,可以通知给应用层,上层调用 read,recv,write,send 等类似接口就不会被阻塞。
我们之前学过select,poll应该知道,select 和 poll 有如下缺陷:
- 它们需要 额外创建数组保存文件描述符,每一次检测时候,都需要将数组中的文件描述重新设置设置进 文件描述符集 中。
- 除此之外,调用select,epoll检测文件描述符集是否有文件描述符事件就的事件复杂度是
- O(N),因为内核需要依次检测文件描述符集中每个文件符的事件是否就绪。
- select中的文件描述集能够设置的文件描述符是有限的.
epoll通过两方面就很好的解决了select和epoll的缺陷
- 第一, epoll在内核中使用 红黑树 来 跟踪进程所有待检测的文件描述符,把需要监 控socket通过epoll_ctl函数加入到内核的红黑树里,红黑树是个高效的数据结构,它的增删查改的时间复杂度是O(logN),当需要进行加入某个文件描述符进行跟踪检测,需要epoll_ctl接口将文件描述符到红黑树中,添加到红黑树的文件描述符则会不断的进行检测,如果想取消 epoll跟踪的检测某个文件描述符,则也可以使用epoll_ctl接口将红黑树中相对应的节点给删除掉.
- 第二,epoll使用事件的驱动机制,内核中会维护着一个就绪队列,当某个文件描述符有事件发生时,则通过回调函数内核将其事件加入到这个就绪队列中,当用户调用epoll_wait接口时,通过就绪队列是否为空来判断是否有某个文件描述符的事件就绪,如果不为空,则说明有文件描述符就绪,则返回就绪队列中文件描述符的个数,因此epoll检测是否有文件描述符就绪的时间复杂度时O(1)。
二. epoll相关系统的调用
1. epoll_create
int epoll_create(int size);
调用epoll_create后,内核会创建一个epoll_create对象,对象中包括跟踪检测事件的红黑树,就绪队列,回调机制。
- 参数: 自从linux2.6.8之后,size参数是被忽略的。
- 返回值:创建epoll_create后会返回一个epoll对象的文件描述符,调用者可以通过文件描述符访问到epoll对象
2. epoll_ctl
epoll_ctl 接口是用来 维护 epoll 对象中红黑树的节点,epoll_ctl可以在红黑树中添加,删除,修改节点。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数:
- epfd:eppoll对象的文件描述符
- op:选择修改epoll中红黑树的方式,如下:
- EPOLL_CTL_ADD:往红黑树中插入节点
- EPOLL_CTL_MOD:修改红黑树中的节点的信息。
- EPOLL_CTL_DELL:删除红黑树中节点。
- fd:文件描述符。
- epoll_event保存的是事件信息,他的结构体如下:
struct epoll_event
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
;
typedef union epoll_data
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
epoll_data_t;
events本质是一个位图,它是用来表示事件的等待方式和事件的工作方式, 相对应的宏定义如下:
- EPOLLIN:表示读事件
- EPOLLOUT:表示写事件。
- EPOLLPRI:表示有紧急数据可以读。
- EPOLLET:表示使用ET的工作方式。
如果想要设置events多个条件,可以将用" | “表示,比如,既想要读事件又想要是ET的触发事件方式,则可以用 EPOLLIN | EPOLLET 表示。
epoll_data是一个联合体,他只能记录一个信息,他可以是指针,或者是一个文件描述符等等.如果是epoll服务器,epoll_data中一般记录的是socket文件描述符.
返回值
调用成功,返回0,调用失败返回-1,并设置errno错误码.
epoll_ctl本质是 以 fd-event 作为 key-value 映射关系插入到红黑树中,底层会根据红黑树节点的event中 events 判断是 需要检测读事件 还是检测写事件。
3. epoll_wait
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
描述:调用epoll_wait能够获取就绪队列中已经就绪的事件。
- epfd:epoll对象的文件描述符
- events:将从就绪队列中获取到的事件信息保存进 events 数组中中,上层就可以通过eventsl中获取到事件信息判断接下来的操作,如果想要从就绪队列中取出多个文件描述符信息,则需要传进去一个event_poll类型的数组。
- maxevents:期望获取就绪队列中的事件的个数
- timeout:在内核阻塞为timeout秒,直到就绪队列不为空。
- 返回值:成功返回获取到的事件的数量,返回0,表明在timeout时间内就绪队列一直为空,返回-1表示epoll_wait发生错误,并且设置errno错误码.
epoll_wait本质是从就绪队列中已经就绪的节点event信息复制上来,上层可以通过event的信息判断是哪一个文件描述符事件就绪,events中判断是读事件就绪还是写事件就绪
三. epoll工作方式
epoll有两种触发模式,一种是水平触发(level-triggered,LT),
另一种是边缘触发(edgetriggered,ET),epoll默认的的工作方式是LT,如果想要设置ET工作方式,需要使用epoll_ctl进行设置。
1. 水平触发模式(level-triggered,LT)
使用水平触发模式,当socket缓冲区如果一直有数据,则就会一直触发回调函数将其socket的事件加入到就绪队列中,只有当socket缓冲区中没有数据,才不会触发回调函数.水平触发模式的socket,水平触发模式的socket可以不用一次性读取socket缓冲区中的数据,因为只要socket缓冲区有数据,则会一直触发回调函数,将socket的事件加入到就绪队列中,上层调用epoll_wait则就可以一直获得到该socket文件描述符.
2. 边缘触发模式(edgetriggered,ET)
- 使用边缘触发,当底层的socket文件描述符中的缓冲区出现变化的时候(缓冲区数据从无到有,从有到多),才会触发回调函数将socket的事件加入到就绪队列中。
- 如果socket缓冲区中没有发生变化,则socket一直不会被触发.即使相对应的socket缓冲区中有数据,。
- 如果是ET模式触发的socket,则每次都需要通过循环调用recv将事件中的socket缓冲区中的数据读取干净,如果没有将数据读取干净,那么下次socket的缓冲区没有数据就绪,那么就一直不会触发socket事件,socket事件就不会加入到就绪队列中,那么socket缓冲区剩下的数据就一直不会读取上来.
- 这里有个小细节,如果是ET模式触发的socket,那么往 socket中的缓冲区读取数据时,使用recv 或者 read 等接口时 去读取缓冲区的数据 一定要设置为非阻塞,因为每次读取都需要循环调用recv接口 去读取socket缓冲区的数据,最后一次读取socket缓冲区一定是为空,则最后一次recv不会读取的时候不会被阻塞在内核中。如果调用read,recv是阻塞读取,那么读取到socket缓冲区为空时,则read,recv则会阻塞在内核中,等待socket数据就绪,此时就相当于破坏了epoll的作用,因为epoll的作用本质是消除recv和read等接口等待数据就绪的过程。
总结:
使用边缘触发模式的效率相比使用水平触发模式的效率更高,因为 边缘触发模式 会逼迫上层一次性读完缓冲区,如果没有读取干净,则剩下的数据可能就不会读取到。每次检测socket触发回调机制,回调机制是会消耗cpu资源。
四.简易的epoll服务器代码编写
server.hpp文件
#pragma once
#include<iostream>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<string.h>
#define LOG_NUM 5
using std::cout;
using std::endl;
namespace sjp
class server
public:
server()
~server()
//创建套接字
static int Socket()
int sockfd=socket(AF_INET,SOCK_STREAM,0);
return sockfd;
//绑定套接字接口
static bool Bind(int sockfd,unsigned short int port)
struct sockaddr_in s;
memset(&s,'\\0',sizeof(s));
s.sin_family=AF_INET;
s.sin_port=htons(port);
s.sin_addr.s_addr=0;
if(bind(sockfd,(struct sockaddr*)&s,sizeof(s))<0)
cout<<"bind error"<<endl;
_exit(-1);
return true;
//监听套接字
static bool Listen(int sockfd)
int i=listen(sockfd,LOG_NUM);
if(i==-1)
cout<<"listen fail"<<endl;
_exit(-2);
return true;
;
epoll_server.hpp文件
#pragma once
#include"server.hpp"
#define NUM 1024
#define WAIT_NUM 32;
#include<sys/epoll.h>
namespace ep_server
class EpollServer
private:
int port;//端口号
int listen_sock;//监听套接字
int epfd;
public:
EpollServer(int _port):port(_port)
void InitServer()
listen_sock=sjp::server::Socket();
sjp::server::Bind(listen_sock,port);
sjp::server::Listen(listen_sock);
epfd=epoll_create(NUM);
void Run()
Addevent(listen_sock,EPOLLIN);
while(1)
struct epoll_event ep[32];//保存就绪事件
int sz=epoll_wait(epfd,ep,32,1000);//sz是获取就绪事件的个数
if(sz>0)
for(int i=0;i<sz;i++)
if(ep[i].events==EPOLLIN)
//可读事件
if(ep[i].data.fd==listen_sock)
//监听套接字就绪,读取socket
//LT触发可以不用一次性将所有链接读上来
//读取到事件的信息
struct sockaddr peer;
socklen_t len;
int fd=accept(listen_sock,&peer,&len);
if(fd>0)
Addevent(fd,EPOLLIN);//将新的socket添加到红黑树中
else
char str[1024];
size_t sz=recv(ep[i].data.fd,(void*)str,1024,MSG_DONTWAIT);
if(sz>0)
str[sz]='\\0';
cout<<str<<endl;
else if(sz==0)
else
//对端关闭,需要在红黑树中删除等待事件
Deletevent(ep[i].data.fd);
close(ep[i].data.fd);//关闭socket
else if(ep[i].events==EPOLLOUT)
//可写事件
else
//其他事件
else if(sz==0)
cout<<"without file fd"<<endl;
else
cout<<"epoll_wait failing"<<endl;
void Deletevent(int fd)
if(epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL)<0)
cout<<"delete event failing,fd :"<<fd<<endl;
//添加等待事件函数
void Addevent(int fd,uint64_t event)
struct epoll_event _event;
_event.events=event;
_event.data.fd=fd;
if(epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&_event)<0)
cout<<"Add epoll fail,fd :"<<fd<<endl;
;
epoll_server.cc
#include"epoll_server.hpp"
#include<stdlib.h>
void Usage()
cout<<"Usage Way: epollserver port"<<endl;
int main(int argc,char* argv[])
if(argc!=2)
Usage();
int port= atoi(argv[1]);
//创建epoll服务器对象
ep_server::EpollServer* es=new ep_server::EpollServer(port);
es->InitServer();
es->Run();
以上是关于多路转接reactor epoll ET 简单的英译汉服务的主要内容,如果未能解决你的问题,请参考以下文章
I/O多路转接模型 [select] [poll] [epoll]