第九章 I/O复用
I/O复用技术是重要的提高服务器工作效率和性能的手段,Linux下实现I/O复用的系统调用主要有select、poll和epoll。
首先我们来看一下select的函数原型和常用的宏:
1 #include<sys/select.h> 2 int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout); 3 FD_ZERO(fd_set *fdset); //清除fdset所有位 4 FD_SET(int fd, fd_set* fdset); //设置fdset的位fd 5 FD_CLR(int fd, fd_set* fdset); //清除fdset的位fd 6 int FD_ISSET(int fd, fd_set* fdset); //测试fdset的位fd是否被设置
首先来看select函数原型,nfds指定了被监听的文件描述符的总数,其值通常被设定为所有文件描述符的最大值加一,接下来的三个fd_set*类型的参数分别指向可读可写和异常事件对应的文件描述符集合,最后一个参数是一个微秒级的定时器,表示select阻塞这个时间后继续执行,如果为0则立即返回,如果为NULL将一直阻塞。
通过观察fd_set结构体的原型,我们发现其仅包含一个整形数组,该数组的每一位都标记了一个文件描述符,所以select有最大可监控的文件描述符的限制。后面的宏是为了简化对于fd_set的位操作。select函数成功时返回就绪文件描述符的总数,如果在超时时间内没有任何文件描述符就绪,则select返回0,如果在select阻塞期间程序收到信号,则select立即返回-1并置errno为EINTR。
select在何种情况下会认为文件描述符产生了可读、可写或异常情况呢?首先,当socket处于以下状态会认为其可读:1)socket内核接收缓冲区中的字节数大于或等于其低水位标记,此时我们可以无阻塞地读该socket,且读操作返回值大于0;2)socket的对端关闭连接,此时读操作返回0;3)监听socket上有新的请求;4)socket上有未处理的错误。而以下状态会认为socket可写:1)socket内核发送缓冲区中的可用字节数大于或等于其低水位标记,此时我们可以无阻塞地写该socket,且写操作返回值大于0;2)socket的写操作被关闭,对写操作关闭的socket执行写操作会触发SIGPIPE信号;3)socket使用非阻塞connect连接成功或者失败(超时)之后;4)socket上有未处理的错误。而异常情况只有一种,就是产生了带外数据。
我们来用一个例子看一下select程序如何来写以及select如何同时处理普通数据和带外数据的:
1 /************************************************************************* 2 > File Name: 9-1.cpp 3 > Author: Torrance_ZHANG 4 > Mail: [email protected] 5 > Created Time: Sat 03 Feb 2018 07:23:52 PM PST 6 ************************************************************************/ 7 8 #include"head.h" 9 using namespace std; 10 11 int main(int argc, char** argv) { 12 if(argc <= 2) { 13 printf("usage: %s ip_address port_number\\n", basename(argv[0])); 14 return 1; 15 } 16 const char* ip = argv[1]; 17 int port = atoi(argv[2]); 18 19 int ret = 0; 20 struct sockaddr_in address; 21 bzero(&address, sizeof(address)); 22 address.sin_family = AF_INET; 23 address.sin_port = htons(port); 24 inet_pton(AF_INET, ip, &address.sin_addr); 25 26 int listenfd = socket(AF_INET, SOCK_STREAM, 0); 27 assert(listenfd >= 0); 28 29 ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); 30 assert(ret != -1); 31 32 ret = listen(listenfd, 5); 33 assert(ret != -1); 34 35 struct sockaddr_in client_address; 36 socklen_t client_addrlength = sizeof(client_address); 37 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 38 if(connfd < 0) { 39 printf("errno is: %d\\n", errno); 40 } 41 42 char buf[1024]; 43 fd_set read_fds; 44 fd_set exception_fds; 45 FD_ZERO(&read_fds); 46 FD_ZERO(&exception_fds); 47 48 while(1) { 49 memset(buf, 0, sizeof(buf)); 50 FD_SET(connfd, &read_fds); 51 FD_SET(connfd, &exception_fds); 52 ret = select(connfd + 1, &read_fds, NULL, &exception_fds, NULL); 53 if(ret < 0) { 54 printf("selection failure\\n"); 55 break; 56 } 57 if(FD_ISSET(connfd, &read_fds)) { 58 ret = recv(connfd, buf, sizeof(buf), 0); 59 if(ret <= 0) break; 60 printf("get %d bytes of normal data: %s\\n", ret, buf); 61 } 62 memset(buf, 0, sizeof(buf)); 63 if(FD_ISSET(connfd, &exception_fds)) { 64 ret = recv(connfd, buf, sizeof(buf), MSG_OOB); 65 if(ret <= 0) break; 66 printf("get %d bytes of oob data: %s\\n", ret, buf); 67 } 68 } 69 close(connfd); 70 close(listenfd); 71 return 0; 72 }
客户端我们使用了前面的5-6程序发送普通数据和带外数据。在实际测试过程中发现了一个问题,即原书上的FD_ISSET里面判断是readfd还是exceptionfd那里是if和else if,测试时只输出了普通数据而没有输出带外数据,将客户端发送带外数据后sleep一下就可以正常接收,或者像我上面程序一样使用两个if判断也可以正常接收。分析一下原因,如果是if和else if,则这两个分支只能触发一个,而对面发送的数据是既有普通数据又有带外数据的,所以导致了这个结果,而如果客户端sleep,则服务器端可以返回后重新读取一次带外数据,而改为两个if,就需要判断两次,这样就不会有处理不了带外数据的情况了。
poll的系统调用和select相似,也是在一段时间内轮询一定数量的文件描述符。当timeout值为-1时poll调用将阻塞直到某个事件发生。
epoll是Linux特有的I/O复用函数,其实现和select、poll有很大区别。epoll将用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传入文件描述符集或事件集,但是epoll需要一个额外的文件描述符来标识内核中的这个事件表。与poll不同的是,epoll如果检测到事件,就将所有就绪时间从内核时间表中复制到events指向的数组中,这样就极大提高了应用程序检索就绪文件描述符的效率,从O(n)的时间复杂度降为了O(1)。我们来看一下epoll的几个函数:
1 #include<sys/epoll.h> 2 int epoll_create(int size); 3 int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event); 4 int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
创建epoll的函数size参数现在是没有用处的,只是给内核一个提示,告诉它事件表需要多大。操作epoll的函数中op参数指定了操作类型,一共有注册、修改和删除三种,而event参数则描述了事件。
epoll对于文件描述符的操作有两种模式:LT和ET,其中LT是默认模式,这种模式下其效率相当于一个稍微改进的poll,效率没有显著提高,而ET模式则是epoll的高效工作模式。对于LT模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,因公程序可以不立即处理该事件,这样当epoll_wait再次被触发,还会再向应用程序通告此事件,知道该事件被处理。而对于ET模式,当epoll_wait检测到其上有事件发生,将其通告应用程序,应用程序必须马上处理,因为后续的epoll_wait将不再向应用程序通知这一事件。可见,ET模式降低了同一个epoll事件被重复触发的次数,所以效率更高。我们用一个实例来看一下:
1 /************************************************************************* 2 > File Name: 9-3.cpp 3 > Author: Torrance_ZHANG 4 > Mail: [email protected] 5 > Created Time: Sat 03 Feb 2018 10:35:56 PM PST 6 ************************************************************************/ 7 8 #include"head.h" 9 using namespace std; 10 11 #define MAX_EVENT_NUMBER 1024 12 #define BUFFER_SIZE 10 13 14 //设置文件描述符为非阻塞模式 15 int setnonblocking(int fd) { 16 int old_option = fcntl(fd, F_GETFL); 17 int new_option = old_option | O_NONBLOCK; 18 fcntl(fd, F_SETFL, new_option); 19 return old_option; 20 } 21 22 //以两种不同模式将事件注册到epoll中 23 void addfd(int epollfd, int fd, bool enable_et) { 24 epoll_event event; 25 event.data.fd = fd; 26 event.events = EPOLLIN; 27 if(enable_et) event.events |= EPOLLET; 28 epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); 29 setnonblocking(fd); 30 } 31 32 void lt(epoll_event* events, int number, int epollfd, int listenfd) { 33 char buf[BUFFER_SIZE]; 34 for(int i = 0; i < number; i ++) { 35 int sockfd = events[i].data.fd; 36 if(sockfd == listenfd) { 37 struct sockaddr_in client_address; 38 socklen_t client_addrlength = sizeof(client_address); 39 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 40 addfd(epollfd, connfd, false); 41 } 42 else if(events[i].events & EPOLLIN) { 43 printf("event trigger once\\n"); 44 memset(buf, 0, sizeof(buf)); 45 int ret = recv(sockfd, buf, BUFFER_SIZE, 0); 46 if(ret <= 0) { 47 close(sockfd); 48 continue; 49 } 50 printf("get %d bytes of content: %s\\n", ret, buf); 51 } 52 else printf("something else happened\\n"); 53 } 54 } 55 56 void et(epoll_event* events, int number, int epollfd, int listenfd) { 57 char buf[BUFFER_SIZE]; 58 for(int i = 0; i < number; i ++) { 59 int sockfd = events[i].data.fd; 60 if(sockfd == listenfd) { 61 struct sockaddr_in client_address; 62 socklen_t client_addrlength = sizeof(client_address); 63 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 64 addfd(epollfd, connfd, true); 65 } 66 else if(events[i].events & EPOLLIN) { 67 //这段代码不会被重复触发,所以我们循环读取 68 printf("event trigger once\\n"); 69 while(1) { 70 memset(buf, 0, sizeof(buf)); 71 int ret = recv(sockfd, buf, BUFFER_SIZE, 0); 72 if(ret < 0) { 73 //非阻塞模式的I/O,当下面的条件成立表示数据已经全部取走 74 if((errno == EAGAIN) || (errno == EWOULDBLOCK)) { 75 printf("read later\\n"); 76 break; 77 } 78 close(sockfd); 79 break; 80 } 81 else if(ret == 0) close(sockfd); 82 else printf("get %d bytes of content: %s\\n", ret, buf); 83 } 84 } 85 else printf("something else happened\\n"); 86 } 87 } 88 89 int main(int argc, char** argv) { 90 if(argc <= 2) { 91 printf("usage: %s ip_address port_number\\n", basename(argv[0])); 92 return 1; 93 } 94 95 const char* ip = argv[1]; 96 int port = atoi(argv[2]); 97 98 int ret = 0; 99 struct sockaddr_in address; 100 bzero(&address, sizeof(address)); 101 address.sin_family = AF_INET; 102 address.sin_port = htons(port); 103 inet_pton(AF_INET, ip, &address.sin_addr); 104 105 int listenfd = socket(AF_INET, SOCK_STREAM, 0); 106 assert(listenfd >= 0); 107 108 ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); 109 assert(ret != -1); 110 111 ret = listen(listenfd, 5); 112 assert(ret != -1); 113 114 epoll_event events[MAX_EVENT_NUMBER]; 115 int epollfd = epoll_create(5); 116 assert(epollfd != -1); 117 addfd(epollfd, listenfd, true); 118 119 while(1) { 120 int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); 121 if(ret < 0) { 122 printf("epoll failure\\n"); 123 break; 124 } 125 //lt(events, ret, epollfd, listenfd); 126 et(events, ret, epollfd, listenfd); 127 } 128 close(listenfd); 129 return 0; 130 }
在上面的实验中,同样发送25个字符,第一个LT工作模式下epoll一共向应用程序通知了三次,而第二种的ET工作模式仅仅通知一次。
即使我们使用ET模式,那一个socket上的事件也有可能被触发多次,比如一个线程读取socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读,此时另外一个线程来读取这些数据,于是就出现了两个线程同时操作一个socket的情况。而我们期望一个socket连接在任意时刻都只被一个线程处理,这一点我们可以用EPOLLONESHOT事件实现。对于注册了EPOLLONESHOT的文件描述符,操作系统最多触发其上注册的一个可读、可写或异常事件,且只触发一次,除非我们使用epoll_ctl重新注册,这样当一个线程在处理某个socket时,其他线程是不可能有机会操作这个socket的。所以当这个线程操作完毕,应该马上注册EPOLLONESHOT事件以确保其可以再次被触发EPOLLIN事件。用一段代码来看一下:
1 /************************************************************************* 2 > File Name: 9-4.cpp 3 > Author: Torrance_ZHANG 4 > Mail: [email protected] 5 > Created Time: Sun 04 Feb 2018 12:24:13 AM PST 6 ************************************************************************/ 7 8 #include"head.h" 9 using namespace std; 10 11 #define MAX_EVENT_NUMBER 1024 12 #define BUFFER_SIZE 1024 13 struct fds { 14 int epollfd; 15 int sockfd; 16 }; 17 18 int setnonblocking(int fd) { 19 int old_option = fcntl(fd, F_GETFL); 20 int new_option = old_option | O_NONBLOCK; 21 fcntl(fd, F_SETFL, new_option); 22 return old_option; 23 } 24 25 void addfd(int epollfd, int fd, bool enable_et) { 26 epoll_event event; 27 event.data.fd = fd; 28 event.events = EPOLLIN | EPOLLET; 29 if(enable_et) event.events |= EPOLLONESHOT; 30 epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); 31 setnonblocking(fd); 32 } 33 34 void reset_oneshot(int epollfd, int fd) { 35 epoll_event event; 36 event.data.fd = fd; 37 event.events = EPOLLIN | EPOLLONESHOT | EPOLLET; 38 epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event); 39 } 40 41 void* worker(void* arg) { 42 int sockfd = ((fds*)arg)->sockfd; 43 int epollfd = ((fds*)arg)->epollfd; 44 printf("start new thread to receive data on fd: %d\\n", sockfd); 45 char buf[BUFFER_SIZE]; 46 memset(buf, 0, sizeof(buf)); 47 while(1) { 48 int ret = recv(sockfd, buf, BUFFER_SIZE, 0); 49 if(ret == 0) { 50 close(sockfd); 51 printf("foreiner closed the connection\\n"); 52 break; 53 } 54 else if(ret < 0) { 55 if(errno == EAGAIN) { 56 reset_oneshot(epollfd, sockfd); 57 printf("read later\\n"); 58 break; 59 } 60 } 61 else { 62 printf("get content: %s\\n", buf); 63 memset(buf, 0, sizeof(buf)); 64 sleep(5); 65 } 66 } 67 printf("end thread receiving data on fd: %d\\n", sockfd); 68 } 69 70 int main(int argc, char** argv) { 71 if(argc <= 2) { 72 printf("usage: %s ip_address port_number\\n", basename(argv[0])); 73 return 1; 74 } 75 76 const char* ip = argv[1]; 77 int port = atoi(argv[2]); 78 79 int ret = 0; 80 struct sockaddr_in address; 81 bzero(&address, sizeof(address)); 82 address.sin_family = AF_INET; 83 address.sin_port = htons(port); 84 inet_pton(AF_INET, ip, &address.sin_addr); 85 86 int listenfd = socket(AF_INET, SOCK_STREAM, 0); 87 assert(listenfd >= 0); 88 89 ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); 90 assert(ret != -1); 91 92 ret = listen(listenfd, 5); 93 assert(ret != -1); 94 95 epoll_event events[MAX_EVENT_NUMBER]; 96 int epollfd = epoll_create(5); 97 assert(epollfd != -1); 98 addfd(epollfd, listenfd, false); 99 100 while(1) { 101 int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); 102 if(ret < 0) { 103 printf("epoll failure\\n"); 104 break; 105 } 106 for(int i = 0; i < ret; i ++) { 107 int sockfd = events[i].data.fd; 108 if(sockfd == listenfd) { 109 struct sockaddr_in client_address; 110 socklen_t client_addrlength = sizeof(client_address); 111 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 112 addfd(epollfd, connfd, true); 113 } 114 else if(events[i].events & EPOLLIN) { 115 pthread_t thread; 116 fds fds_for_new_worker; 117 fds_for_new_worker.epollfd = epollfd; 118 fds_for_new_worker.sockfd = sockfd; 119 pthread_create(&thread, NULL, worker, (void*)&fds_for_new_worker); 120 } 121 else printf("something else happened\\n"); 122 } 123 } 124 close(listenfd); 125 return 0; 126 }
代码中我们用sleep(5)来模拟数据处理的过程,当数据在处理时,我们又一次发送了数据,而此时服务器并没有调用另外一个线程继续工作,而是由原线程处理,我们看到两次数据都是由5号线程处理的。这样保证了连接的完整性,从而避免了很多可能的竞态条件。
具体分析了三种I/O复用模式,我们来对比一下他们的异同与优缺点。这三个函数都能在一定时间内监听一定数量的文件描述符,若一个或多个文件描述符有时间返回时函数返回就绪的文件描述符的个数。select的参数类型fd_set没有把文件描述符和事件绑定,因此我们需要提供这三个集合,而且select也只能监听这三种事件,另一方面内核对fd_set集合会在线修改,所以应用程序下次调用select需要重置三个集合。而poll将所有事件放在了一起,使编程接口更加简洁,且内核每次操作的是pollfd中的revents成员,events并未改变。最后epoll则完全不同,它在内核维护一个内核事件表,每次epoll_wait操作都是直接从该内核事件表中取得用户注册的事件,而无需从用户空间重复读入这些事件。
另外,poll和epoll可以监听的最大文件描述符数目都能达到系统允许的最大值,即65535,而select由于fd_set的本质是一个整形数组,每一位代表一个文件描述符,所以其能监听的最大数量有限制。
从效率和原理上来讲,select和poll都工作在低效的LT模式,而且采用轮询的方式,而epoll则不同,他支持高效的ET模式,而且可以支持EPOLLONESHOT事件以进一步减少可读可写和异常事件的触发次数,不仅如此,它采用了回调的方式,内核检测到就绪的文件描述符就触发回调函数,回调函数将该文件描述符上的对应事件插入内核的就绪队列,最后将就绪队列中的内容拷贝到用户空间。但是当活动连接较多时,epoll的效率未必比select和poll高,所以epoll_wait适用于连接数量多但是活动连接少的情况。
当我们对一个非阻塞的socket调用一个connect,而连接又没有被马上建立,这时,我们可以调用select、poll等函数来监听socket上的可写事件,当函数返回时利用getsockopt读取错误码并清除错误,如果错误码是0则表示连接成功建立,否则失败。我们来看一下这种情况怎么实现:
1 /************************************************************************* 2 > File Name: 9-5.cpp 3 > Author: Torrance_ZHANG 4 > Mail: [email protected] 5 > Created Time: Sun 04 Feb 2018 07:47:09 PM PST 6 ************************************************************************/ 7 8 #include"head.h" 9 using namespace std; 10 11 #define BUFFER_SIZE 1024 12 13 int setnonblocking(int fd) { 14 int old_option = fcntl(fd, F_GETFL); 15 int new_option = old_option | O_NONBLOCK; 16 fcntl(fd, F_SETFL, new_option); 17 return old_option; 18 } 19 20 //非阻塞连接,如果函数成功则返回连接的socket,不成功返回-1 21 int unblock_connect(const char* ip, int port, int time) { 22 int ret = 0; 23 struct sockaddr_in address; 24 bzero(&address, sizeof(address)); 25 address.sin_family = AF_INET; 26 inet_pton(AF_INET, ip, &address.sin_addr); 27 address.sin_port = htons(port); 28 29 int sockfd = socket(AF_INET, SOCK_STREAM, 0); 30 int fdopt = setnonblocking(sockfd); 31 ret = connect(sockfd, (struct sockaddr*)&address, sizeof(address)); 32 if(ret == 0) { 33 printf("connect with server immediately\\n"); 34 fcntl(sockfd, F_SETFL, fdopt); 35 return sockfd; 36 } 37 else if(errno != EINPROGRESS) { 38 printf("unblock connect not support\\n"); 39 return -1; 40 } 41 fd_set readfds; 42 fd_set writefds; 43 struct timeval timeout; 44 45 FD_ZERO(&readfds); 46 FD_ZERO(&writefds); 47 48 timeout.tv_sec = time; 49 timeout.tv_usec = 0; 50 51 ret = select(sockfd + 1, NULL, &writefds, NULL, &timeout); 52 if(ret <= 0) { 53 printf("connection time out\\n"); 54 close(sockfd); 55 return -1; 56 } 57 58 if(!FD_ISSET(sockfd, &writefds)) { 59 printf("no events on sockfd found\\n"); 60 close(sockfd); 61 return -1; 62 } 63 64 int error = 0; 65 socklen_t length = sizeof(error); 66 if(getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &length) < 0) { 67 printf("get socket option failed\\n"); 68 close(sockfd); 69 return -1; 70 } 71 72 if(errno != 0) { 73 printf("connection failed after select with the error: %d\\n", error); 74 close(sockfd); 75 return -1; 76 } 77 78 printf("connection ready after select with the socket: %d\\n", sockfd); 79 fcntl(sockfd, F_SETFL, fdopt); 80 return sockfd; 81 } 82 83 int main(int argc, char** argv) { 84 if(argc <= 2) { 85 printf("usage: %s ip_address port_number\\n", basename(argv[0])); 86 return 1; 87 } 88 const char* ip = argv[1]; 89 int port = atoi(argv[2]); 90 int sockfd = unblock_connect(ip, port, 10); 91 if(sockfd < 0) return 1; 92 close(sockfd); 93 return 0; 94 }
很多服务器要一边处理网络连接一边处理用户输入,比如聊天室程序,这样的就可以用I/O复用来实现,我们以一个poll实现的聊天室程序来举例说明一下:
首先看下客户端:
1 /************************************************************************* 2 > File Name: 9-6.cpp 3 > Author: Torrance_ZHANG 4 > Mail: [email protected] 5 > Created Time: Sun 04 Feb 2018 08:27:40 PM PST 6 ************************************************************************/ 7 8 #define _GNU_SOURCE 1 9 #include"head.h" 10 using namespace std; 11 12 #define BUFFER_SIZE 64 13 14 int main(int argc, char** argv) { 15 if(argc <= 2) { 16 printf("usage: %s ip_address port_number\\n", basename(argv[0])); 17 return 1; 18 } 19 const char* ip = argv[1]; 20 int port = atoi(argv[2]); 21 22 struct sockaddr_in server_address; 23 bzero(&server_address, sizeof(server_address)); 24 server_address.sin_family = AF_INET; 25 server_address.sin_port = htons(port); 26 inet_pton(AF_INET, ip, &server_address.sin_addr); 27 28 int sockfd = socket(AF_INET, SOCK_STREAM, 0); 29 assert(sockfd >= 0); 30 if(connect(sockfd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) { 31 printf("connection failed\\n"); 32 close(sockfd); 33 return 1; 34 } 35 pollfd fds[2]; 36 fds[0].fd = 0; 37 fds[0].events = POLLIN; 38 fds[0].revents = 0; 39 fds[1].fd = sockfd; 40 fds[1].events = POLLIN | POLLRDHUP; 41 fds[1].revents = 0; 42 char read_buf[BUFFER_SIZE]; 43 int pipefd[2]; 44 int ret = pipe(pipefd); 45 assert(ret != -1); 46 47 while(1) { 48 ret = poll(fds, 2, -1); 49 if(ret < 0) { 50 printf("poll failure\\n"); 51 break; 52 } 53 54 if(fds[1].revents & POLLRDHUP) { 55 printf("server close the connection\\n"); 56 break; 57 } 58 else if(fds[1].revents & POLLIN) { 59 memset(read_buf, 0, sizeof(read_buf)); 60 recv(fds[1].fd, read_buf, BUFFER_SIZE, 0); 61 printf("%s\\n", read_buf); 62 } 63 64 if(fds[0].revents & POLLIN) { 65 ret = splice(0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE); 66 ret = splice(pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE); 67 } 68 } 69 close(sockfd); 70 return 0; 71 }
再看下服务器端:
1 /************************************************************************* 2 > File Name: 9-7.cpp 3 > Author: Torrance_ZHANG 4 > Mail: [email protected] 5 > Created Time: Sun 04 Feb 2018 10:35:16 PM PST 6 ************************************************************************/ 7 8 #define _GNU_SOURCE 1 9 #include"head.h" 10 using namespace std; 11 12 #define USER_LIMIT 5 13 #define BUFFER_SIZE 64 14 #define FD_LIMIT 65535 15 16 struct client_data { 17 sockaddr_in address; 18 char* write_buf; 19 char buf[BUFFER_SIZE]; 20 }; 21 22 int setnonblocking(int fd) { 23 int old_option = fcntl(fd, F_GETFL); 24 int new_option = old_option | O_NONBLOCK; 25 fcntl(fd, F_SETFL, new_option); 26 return old_option; 27 } 28 29 int main(int argc, char** argv) { 30 if(argc <= 2) { 31 printf("usage: %s ip_address port_number\\n", basename(argv[0])); 32 return 1; 33 } 34 const char* ip = argv[1]; 35 int port = atoi(argv[2]); 36 37 int ret = 0; 38 struct sockaddr_in address; 39 bzero(&address, sizeof(address)); 40 address.sin_family = AF_INET; 41 address.sin_port = htons(port); 42 inet_pton(AF_INET, ip, &address.sin_addr); 43 44 int listenfd = socket(AF_INET, SOCK_STREAM, 0); 45 assert(listenfd >= 0); 46 ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); 47 assert(ret != -1); 48 49 ret = listen(listenfd, 5); 50 assert(ret != -1); 51 52 client_data* users = new client_data[FD_LIMIT]; 53 pollfd fds[USER_LIMIT + 1]; 54 int user_counter = 0; 55 for(int i = 1; i <= USER_LIMIT; i ++) { 56 fds[i].fd = -1; 57 fds[i].events = 0; 58 } 59 fds[0].fd = listenfd; 60 fds[0].events = POLLIN | POLLERR; 61 fds[0].revents = 0; 62 63 while(1) { 64 ret = poll(fds, user_counter + 1, -1); 65 if(ret < 0) { 66 printf("poll failure\\n"); 67 break; 68 } 69 70 for(int i = 0; i < user_counter + 1;i ++) { 71 if((fds[i].fd == listenfd) && (fds[i].revents & POLLIN)) { 72 struct sockaddr_in client_address; 73 socklen_t client_addrlength = sizeof(client_address); 74 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 75 76 if(connfd < 0) { 77 printf("errno is: %d\\n", errno); 78 continue; 79 } 80 81 if(user_counter >= USER_LIMIT) { 82 const char* info = "too many users\\n"; 83 printf("%s", info); 84 send(connfd, info, sizeof(info), 0); 85 close(connfd); 86 continue; 87 } 88 89 user_counter ++; 90 users[connfd].address = client_address; 91 setnonblocking(connfd); 92 fds[user_counter].fd = connfd; 93 fds[user_counter].events = POLLIN | POLLERR | POLLRDHUP; 94 fds[user_counter].revents = 0; 95 printf("comes a new user, now have %d users\\n", user_counter); 96 } 97 else if(fds[i].revents & POLLERR) { 98 printf("get an error from %d\\n", fds[i].fd); 99 char errors[100]; 100 memset(errors, 0, sizeof(errors)); 101 socklen_t length = sizeof(errors); 102 if(getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length) < 0) { 103 printf("get socket option failed\\n"); 104 } 105 continue; 106 } 107 else if(fds[i].revents &POLLRDHUP) { 108 users[fds[i].fd] = users[fds[user_counter].fd]; 109 close(fds[i].fd); 110 fds[i] = fds[user_counter]; 111 user_counter --; 112 printf("a client left\\n"); 113 } 114 else if(fds[i].revents & POLLIN) { 115 int connfd = fds[i].fd; 116 memset(users[connfd].buf, ‘\\0‘, BUFFER_SIZE); 117 ret = recv(connfd, users[connfd].buf, BUFFER_SIZE, 0); 118 if(ret < 0) { 119 if(errno != EAGAIN) { 120 close(connfd); 121 users[fds[i].fd] = users[fds[user_counter].fd]; 122 fds[i] = fds[user_counter]; 123 user_counter --; 124 i --; 125 } 126 } 127 else if(ret == 0) {} 128 else { 129 for(int j = 1; j <= user_counter; j ++) { 130 if(fds[j].fd == connfd) continue; 131 fds[j].events |= ~POLLIN; 132 fds[j].events |= POLLOUT; 133 users[fds[j].fd].write_buf = users[connfd].buf; 134 } 135 } 136 } 137 else if(fds[i].revents & POLLOUT) { 138 int connfd = fds[i].fd; 139 if(!users[connfd].write_buf) continue; 140 ret = send(connfd, users[connfd].write_buf, strlen(users[connfd].write_buf), 0); 141 users[connfd].write_buf = NULL; 142 143 fds[i].events |= ~POLLOUT; 144 fds[i].events |= POLLIN; 145 } 146 } 147 } 148 delete []users; 149 close(listenfd); 150 return 0; 151 }
对于这个程序,其难点只有一个,就是当用户退出的时候,我们将整个用户列表的最后一项移动到退出用户的这一项,可以保证前user_counter项都是有客户的。但是对于执行完POLLIN事件后,我们取消注册了POLLIN,转而注册了POLLOUT,在我的理解下可能有些问题。我们考虑一个极端情况,如果取消注册了POLLIN,注册POLLOUT后,主线程转而处理下一个poll事件,但此时刚刚处理过的文件描述符上由发来了数据,而由于此时POLLIN事件已经被取消注册,那么就不会在下一轮轮询中被触发POLLIN。 我将代码中下一步的else if改为了if,这样判断完POLLIN事件后会在判断一次POLLOUT事件,经测试没有问题。但是仍有待商榷,暂留疑问待日后解决。
最后,I/O复用还可以用于同时监听TCP和UDP服务:
1 /************************************************************************* 2 > File Name: 9-8.cpp 3 > Author: Torrance_ZHANG 4 > Mail: [email protected] 5 > Created Time: Tue 06 Feb 2018 03:23:19 AM PST 6 ************************************************************************/ 7 8 #include"head.h" 9 using namespace std; 10 11 #define MAX_EVENT_NUMBER 1024 12 #define TCP_BUFFER_SIZE 512 13 #define UDP_BUFFER_SIZE 1024 14 15 int setnonblocking(int fd) { 16 int old_option = fcntl(fd, F_GETFL); 17 int new_option = old_option | O_NONBLOCK; 18 fcntl(fd, F_SETFL, new_option); 19 return old_option; 20 } 21 22 void addfd(int epollfd, int fd) { 23 epoll_event event; 24 event.data.fd = fd; 25 event.events = EPOLLIN | EPOLLET; 26 epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); 27 setnonblocking(fd); 28 } 29 30 int main(int argc, char** argv) { 31 if(argc <= 2) { 32 printf("usage: %s ip_address port_number\\n", basename(argv[0])); 33 return 1; 34 } 35 const char* ip = argv[1]; 36 int port = atoi(argv[2]); 37 38 int ret = 0; 39 struct sockaddr_in address; 40 bzero(&address, sizeof(address)); 41 address.sin_family = AF_INET; 42 inet_pton(AF_INET, ip, &address.sin_addr); 43 address.sin_port = htons(port); 44 45 int listenfd = socket(AF_INET, SOCK_STREAM, 0); 46 assert(listenfd >= 0); 47 48 ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); 49 assert(ret != -1); 50 51 ret = listen(listenfd, 5); 52 assert(ret != -1); 53 54 bzero(&address, sizeof(address)); 55 address.sin_family = AF_INET; 56 inet_pton(AF_INET, ip, &address.sin_addr); 57 address.sin_port = htons(port); 58 int udpfd = socket(AF_INET, SOCK_DGRAM, 0); 59 assert(udpfd >= 0); 60 61 ret = bind(udpfd, (struct sockaddr*)&address, sizeof(address)); 62 assert(ret != -1); 63 epoll_event events[MAX_EVENT_NUMBER]; 64 int epollfd = epoll_create(5); 65 assert(epollfd != -1); 66 67 addfd(epollfd, listenfd); 68 addfd(epollfd, udpfd); 69 70 while(1) { 71 int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); 72 if(number < 0) { 73 printf("epoll failure\\n"); 74 break; 75 } 76 for(int i = 0; i < number; i ++) { 77 int sockfd = events[i].data.fd; 78 if(sockfd == listenfd) { 79 struct sockaddr_in client_address; 80 socklen_t client_addrlength = sizeof(client_address); 81 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 82 addfd(epollfd, connfd); 83 } 84 else if(sockfd == udpfd) { 85 char buf[UDP_BUFFER_SIZE]; 86 memset(buf, 0, UDP_BUFFER_SIZE); 87 struct sockaddr_in client_address; 88 socklen_t client_addrlength = sizeof(client_address); 89 90 ret = recvfrom(udpfd, buf, UDP_BUFFER_SIZE, 0, (struct sockaddr*)&client_address, &client_addrlength); 91 if(ret > 0) { 92 sendto(udpfd, buf, UDP_BUFFER_SIZE, 0, (struct sockaddr*)&client_address, client_addrlength); 93 } 94 } 95 else if(events[i].events & EPOLLIN) { 96 char buf[TCP_BUFFER_SIZE]; 97 while(1) { 98 memset(buf, 0, TCP_BUFFER_SIZE); 99 ret = recv(sockfd, buf, TCP_BUFFER_SIZE, 0); 100 if(ret < 0) { 101 if((errno == EAGAIN) || (errno == EWOULDBLOCK)) break; 102 close(sockfd); 103 break; 104 } 105 else if(ret == 0) { 106 close(sockfd); 107 } 108 else send(sockfd, buf, ret, 0); 109 } 110 } 111 } 112 } 113 close(listenfd); 114 return 0; 115 }