为什么使用epoll
这个是老生常谈了,四个字,多路复用,要不单线程只能停等排队。另外select和poll不如epoll强大好用。
程序结构漫谈
代码很简陋,基本属于玩具。但是还是随便谈谈。
在单线程模型下使用epoll,只能使用一个epoll的instance同时监听socket描述符和connection描述符。当socket描述符就位时,就调用accept处理三次握手建立连接,同时将调用epoll_ctl将这个connfd加入epoll的事件监听表中。如果connfd就位,就调用recv函数将缓冲区中的数据全部读出然后原封不动的send回去。
实际上对于LT模式,每次recv是没有必要读出全部的数据,因为只要缓冲区内还有数据剩余,只要不关闭对应的fd,epoll_wait仍然会通知用户。但由于这是个echo服务器,如果在LT模式下不一次读完数据,就需要对每个连接的当前回显位置做一个缓存,所以为了方便起见,还是一次读完吧。
epoll的两种模式和注意点
epoll有LT(Level-Trigger)和ET(Edge-Trigger)模式,为了简便本文使用了LT模式。
使用LT模式的时候,sockfd和connfd都不需要设置成非阻塞(non-blocking)模式,但是使用ET模式的时候,需要将connfd设置为非阻塞模式,下面来着重讨论一下这个问题。
如果ET模式“必须”使用connfd设置为非阻塞模式的说法不太严谨,应该说“不建议”或者“不适合”。
在边缘触发模式下,EPOLLIN必须要在对端有新数据写入后才能触发,如果一次EPOLLIN事件触发后,用户代码没有将缓冲区内所有的数据取出来,那只能等待对端的下一次写入,才能把上一次遗漏的一并取出来。
试想这样一个情况,服务端和客户端在进行一问一答的双向通信,服务端需要完整解析出客户端的报文,再根据客户端的请求返回信息。这个流程完成后,客户端才能进行下一次请求。如果出现了上文描述的情况(边缘触发且没有完整读完数据),那么服务端就不会给客户端返数据,客户端收不到数据,自然就不会有下一次的对端写入,形成了一次双向等待,神似死锁。
既然epoll的ET模式必须要一次读完缓冲区的所有数据,大致应该像这么做。
setnonblock(sockfd);
while(1)
{
int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
if(ret < 0)
{
if((errno == EAGAIN) || (errno == EWOULDBLOCK)) // 证明缓冲区中的数据读完了
{
printf("read later\\n");
break;
}
close(sockfd); // 证明出现了其他错误
break;
}
else if(ret == 0) //对端连接已经关闭
{
close(sockfd);
}
else
{
printf( "get %d bytes of content: %s\\n", ret, buf );
}
}
性能测试
由于是单线程echo服务器,所以为了方便起见使用一个多线程客户端循环读写数据。注意,使用单线程的客户端是没有意义的,因为单线程的客户端难以对单线程的服务端造成压力,要测试出最大的qps,必须使用多线程的客户端。
下图记录了不同线程数量所测得的QPS。
可以看到线程数较少的时候,无法测试出较为真实的QPS,因为无法对服务器造成最大负载。
Server Code
// SimpleEpollEcho.h
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <vector>
class SimpleEpollEcho {
public:
SimpleEpollEcho(bool enable_et);
~SimpleEpollEcho();
int init();
void start_lt();
private:
int SetNonblocking(int fd);
int sockfd;
int epfd;
const char *ip = "localhost";
const static int port = 8888;
const static int MAX_EPOLL_EVENT_NUM = 1024;
const static int BUFFER_SIZE = 16;
epoll_event events[MAX_EPOLL_EVENT_NUM];
char buf[BUFFER_SIZE];
};
// SimpleEpollEcho.cpp
// use Level-Trigger mode, echo message must be 16bytes long. the connfd is in blocking mode
//
#include <strings.h>
#include <cassert>
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include "SimpleEpollEcho.h"
SimpleEpollEcho::SimpleEpollEcho(bool enable_et) : enable_et(enable_et)
{
}
SimpleEpollEcho::~SimpleEpollEcho()
{
}
int SimpleEpollEcho::init()
{
int ret;
do
{
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons(port);
sockfd = socket(PF_INET, SOCK_STREAM, 0);
assert(sockfd >= 0);
int on = 1;
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
ret = bind(sockfd, (struct sockaddr *)&address, sizeof(address));
assert(ret != -1);
ret = listen(sockfd, 5);
assert(ret != -1);
epfd = epoll_create(5);
assert(epfd != -1);
// add socket fd to epoll fd
{
epoll_event e;
e.events = EPOLLIN;
if(enable_et)
e.events |= EPOLLET;
e.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &e);
}
}
while(0);
}
void SimpleEpollEcho::start_lt()
{
while(1)
{
int ready = epoll_wait(epfd, events, MAX_EPOLL_EVENT_NUM, -1);
if(ready < 0)
{
std::cout << "epoll_wait error" << std::endl;
break;
}
for(int i = 0; i < ready; i++)
{
// the sockfd in epoll table will be never removed
// so if there comes a new connection, we can accept it and get the connfd
if((events[i].data.fd == sockfd) && (events[i].events & EPOLLIN))
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept(sockfd, (struct sockaddr *)&client_address, &client_addrlength);
{
epoll_event e;
e.events = EPOLLIN;
e.data.fd = connfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &e);
}
}
// if there comes new data of some connection
// now we read the connection data echo it
else if((events[i].data.fd != sockfd) && (events[i].events & EPOLLIN))
{
int connfd = events[i].data.fd;
// if flags is 0, recv behave just like read on a socket fd
// with Level-Trigger Mode, it\'s ok to not read all data in buffer
// because if there\'s data remained, kernel will remind us again
// however, this requires every connection a index and buffer
// to store their data. In single-thread program, I suppose it\'s not good
// use multi-thread stack to store that may be more reasonable
// so in single thread, we read it all
char *pbuf = buf;
int recvn = 0, sendn = 0;
int recv_len = BUFFER_SIZE;
while(recvn < recv_len)
{
recvn += recv(connfd, pbuf, recv_len - recvn, 0);
pbuf += recvn;
}
pbuf = buf;
while(sendn < recvn)
{
sendn += send(connfd, pbuf, recvn - sendn, 0);
pbuf += sendn;
}
if(buf[0] == \'0\')
{
close(connfd);
epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);
}
}
}
}
}
int SimpleEpollEcho::SetNonblocking(int fd) {
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}
Client Code
#include <iostream>
#include <cassert>
#include <pthread.h>
#include <sys/socket.h>
#include <unistd.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <arpa/inet.h>
#include <errno.h>
#include <strings.h>
const int MAX_BUF_LEN = 16;
char normal_buf[MAX_BUF_LEN] = "12345678901234\\n";
char exit_buf[MAX_BUF_LEN] = "01234567980123\\n";
const int MAX_THREAD_NUM = 64;
void* thread_test(void* ptr)
{
printf("thread start\\r\\n");
struct timeval tv_before;
struct timeval tv_after;
struct timeval tv_total_gap;
struct timezone tz;
struct sockaddr_in servaddr;
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(8888);
inet_pton( AF_INET, "localhost", &servaddr.sin_addr );
socklen_t socklen = sizeof (servaddr);
int sockfd = socket(AF_INET,SOCK_STREAM, 0 );
assert(sockfd > 0);
connect(sockfd, (const struct sockaddr *)&servaddr, sizeof(servaddr));
int x = errno;
for(int j = 0; j < 10; j++)
{
gettimeofday(&tv_before, &tz);
for(int i = 0; i < 10000; i++)
{
send(sockfd, normal_buf, MAX_BUF_LEN, MSG_WAITALL);
recv(sockfd, normal_buf, MAX_BUF_LEN, MSG_WAITALL);
}
gettimeofday(&tv_after, &tz);
tv_total_gap.tv_usec += tv_after.tv_usec - tv_before.tv_usec;
tv_total_gap.tv_sec += tv_after.tv_sec - tv_before.tv_sec;
}
send(sockfd, exit_buf, MAX_BUF_LEN, MSG_WAITALL);
recv(sockfd, exit_buf, MAX_BUF_LEN, MSG_WAITALL);
printf("total second gap, %ld, total usecond gap: %ld\\r\\n", tv_total_gap.tv_sec, tv_total_gap.tv_usec);
close(sockfd);
}
int main()
{
pthread_t threads[MAX_THREAD_NUM];
for(int i = 0; i< MAX_THREAD_NUM; i++)
{
pthread_create(&threads[i], NULL, thread_test, NULL);
}
for(int i = 0; i< MAX_THREAD_NUM; i++)
{
pthread_join(threads[i], NULL);
}
return 0;
}
番外:如何进化为一个简单的Reactor
一个Reactor需要四个部分
- Resources,在这里对应sockfd和connfd
- Multiplexer,显然对应epoll
- Dispatcher,用于绑定回调函数到某个event上面
- Request Handler,回调函数
在前面的代码中,程序对于所有的请求都是无差别的回显,如果要对于不同的请求绑定不同的request handler,则需要将event做一次封装。举例如下:
typedef (*callback)(int event, int fd, void* arg);
struct ReactorEvent
{
int epoll_fd_
int fd_; // connfd
int epoll_event_; // 此fd感兴趣的epoll事件
callback cb_; // 用户自定义事件发生后的回调函数
int event_; // 回调参数,用户自定义的事件
void *arg_; // 回调参数
char *buf_; // 事件的I/O缓冲
int bufsize_
}
在执行epoll_ctl添加监听事件时,struct epoll_event中的epoll_data_t epoll_data就不能使用int fd这个域了,要使用void *ptr域,用于保存ReactorEvent的指针,否则在回调的时候找不到callback的地址。