《Linux高性能服务器编程》学习总结——I/O复用

Posted torrance

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《Linux高性能服务器编程》学习总结——I/O复用相关的知识,希望对你有一定的参考价值。

 

第九章      I/O复用

  I/O复用技术是重要的提高服务器工作效率和性能的手段,Linux下实现I/O复用的系统调用主要有select、poll和epoll。

  首先我们来看一下select的函数原型和常用的宏:

1 #include<sys/select.h>
2 int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);
3 FD_ZERO(fd_set *fdset);                    //清除fdset所有位
4 FD_SET(int fd, fd_set* fdset);                    //设置fdset的位fd
5 FD_CLR(int fd, fd_set* fdset);                //清除fdset的位fd
6 int FD_ISSET(int fd, fd_set* fdset);            //测试fdset的位fd是否被设置

  首先来看select函数原型,nfds指定了被监听的文件描述符的总数,其值通常被设定为所有文件描述符的最大值加一,接下来的三个fd_set*类型的参数分别指向可读可写和异常事件对应的文件描述符集合,最后一个参数是一个微秒级的定时器,表示select阻塞这个时间后继续执行,如果为0则立即返回,如果为NULL将一直阻塞。

  通过观察fd_set结构体的原型,我们发现其仅包含一个整形数组,该数组的每一位都标记了一个文件描述符,所以select有最大可监控的文件描述符的限制。后面的宏是为了简化对于fd_set的位操作。select函数成功时返回就绪文件描述符的总数,如果在超时时间内没有任何文件描述符就绪,则select返回0,如果在select阻塞期间程序收到信号,则select立即返回-1并置errno为EINTR。

  select在何种情况下会认为文件描述符产生了可读、可写或异常情况呢?首先,当socket处于以下状态会认为其可读:1)socket内核接收缓冲区中的字节数大于或等于其低水位标记,此时我们可以无阻塞地读该socket,且读操作返回值大于0;2)socket的对端关闭连接,此时读操作返回0;3)监听socket上有新的请求;4)socket上有未处理的错误。而以下状态会认为socket可写:1)socket内核发送缓冲区中的可用字节数大于或等于其低水位标记,此时我们可以无阻塞地写该socket,且写操作返回值大于0;2)socket的写操作被关闭,对写操作关闭的socket执行写操作会触发SIGPIPE信号;3)socket使用非阻塞connect连接成功或者失败(超时)之后;4)socket上有未处理的错误。而异常情况只有一种,就是产生了带外数据。

  我们来用一个例子看一下select程序如何来写以及select如何同时处理普通数据和带外数据的:

 1 /*************************************************************************
 2     > File Name: 9-1.cpp
 3     > Author: Torrance_ZHANG
 4     > Mail: [email protected]
 5     > Created Time: Sat 03 Feb 2018 07:23:52 PM PST
 6  ************************************************************************/
 7 
 8 #include"head.h"
 9 using namespace std;
10 
11 int main(int argc, char** argv) {
12     if(argc <= 2) {
13         printf("usage: %s ip_address port_number\\n", basename(argv[0]));
14         return 1;
15     }
16     const char* ip = argv[1];
17     int port = atoi(argv[2]);
18 
19     int ret = 0;
20     struct sockaddr_in address;
21     bzero(&address, sizeof(address));
22     address.sin_family = AF_INET;
23     address.sin_port = htons(port);
24     inet_pton(AF_INET, ip, &address.sin_addr);
25 
26     int listenfd = socket(AF_INET, SOCK_STREAM, 0);
27     assert(listenfd >= 0);
28 
29     ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
30     assert(ret != -1);
31 
32     ret = listen(listenfd, 5);
33     assert(ret != -1);
34 
35     struct sockaddr_in client_address;
36     socklen_t client_addrlength = sizeof(client_address);
37     int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
38     if(connfd < 0) {
39         printf("errno is: %d\\n", errno);
40     }
41 
42     char buf[1024];
43     fd_set read_fds;
44     fd_set exception_fds;
45     FD_ZERO(&read_fds);
46     FD_ZERO(&exception_fds);
47 
48     while(1) {
49         memset(buf, 0, sizeof(buf));
50         FD_SET(connfd, &read_fds);
51         FD_SET(connfd, &exception_fds);
52         ret = select(connfd + 1, &read_fds, NULL, &exception_fds, NULL);
53         if(ret < 0) {
54             printf("selection failure\\n");
55             break;
56         }
57         if(FD_ISSET(connfd, &read_fds)) {
58             ret = recv(connfd, buf, sizeof(buf), 0);
59             if(ret <= 0) break;
60             printf("get %d bytes of normal data: %s\\n", ret, buf);
61         }
62         memset(buf, 0, sizeof(buf));
63         if(FD_ISSET(connfd, &exception_fds)) {
64             ret = recv(connfd, buf, sizeof(buf), MSG_OOB);
65             if(ret <= 0) break;
66             printf("get %d bytes of oob data: %s\\n", ret, buf);
67         }
68     }
69     close(connfd);
70     close(listenfd);
71     return 0;
72 }

技术分享图片

  客户端我们使用了前面的5-6程序发送普通数据和带外数据。在实际测试过程中发现了一个问题,即原书上的FD_ISSET里面判断是readfd还是exceptionfd那里是if和else if,测试时只输出了普通数据而没有输出带外数据,将客户端发送带外数据后sleep一下就可以正常接收,或者像我上面程序一样使用两个if判断也可以正常接收。分析一下原因,如果是if和else if,则这两个分支只能触发一个,而对面发送的数据是既有普通数据又有带外数据的,所以导致了这个结果,而如果客户端sleep,则服务器端可以返回后重新读取一次带外数据,而改为两个if,就需要判断两次,这样就不会有处理不了带外数据的情况了。

  poll的系统调用和select相似,也是在一段时间内轮询一定数量的文件描述符。当timeout值为-1时poll调用将阻塞直到某个事件发生。

  epoll是Linux特有的I/O复用函数,其实现和select、poll有很大区别。epoll将用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传入文件描述符集或事件集,但是epoll需要一个额外的文件描述符来标识内核中的这个事件表。与poll不同的是,epoll如果检测到事件,就将所有就绪时间从内核时间表中复制到events指向的数组中,这样就极大提高了应用程序检索就绪文件描述符的效率,从O(n)的时间复杂度降为了O(1)。我们来看一下epoll的几个函数:

1 #include<sys/epoll.h>
2 int epoll_create(int size);
3 int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
4 int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

  创建epoll的函数size参数现在是没有用处的,只是给内核一个提示,告诉它事件表需要多大。操作epoll的函数中op参数指定了操作类型,一共有注册、修改和删除三种,而event参数则描述了事件。

  epoll对于文件描述符的操作有两种模式:LT和ET,其中LT是默认模式,这种模式下其效率相当于一个稍微改进的poll,效率没有显著提高,而ET模式则是epoll的高效工作模式。对于LT模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,因公程序可以不立即处理该事件,这样当epoll_wait再次被触发,还会再向应用程序通告此事件,知道该事件被处理。而对于ET模式,当epoll_wait检测到其上有事件发生,将其通告应用程序,应用程序必须马上处理,因为后续的epoll_wait将不再向应用程序通知这一事件。可见,ET模式降低了同一个epoll事件被重复触发的次数,所以效率更高。我们用一个实例来看一下:

  1 /*************************************************************************
  2     > File Name: 9-3.cpp
  3     > Author: Torrance_ZHANG
  4     > Mail: [email protected]
  5     > Created Time: Sat 03 Feb 2018 10:35:56 PM PST
  6  ************************************************************************/
  7 
  8 #include"head.h"
  9 using namespace std;
 10 
 11 #define MAX_EVENT_NUMBER 1024
 12 #define BUFFER_SIZE 10
 13 
 14 //设置文件描述符为非阻塞模式
 15 int setnonblocking(int fd) {
 16     int old_option = fcntl(fd, F_GETFL);
 17     int new_option = old_option | O_NONBLOCK;
 18     fcntl(fd, F_SETFL, new_option);
 19     return old_option;
 20 }
 21 
 22 //以两种不同模式将事件注册到epoll中
 23 void addfd(int epollfd, int fd, bool enable_et) {
 24     epoll_event event;
 25     event.data.fd = fd;
 26     event.events = EPOLLIN;
 27     if(enable_et) event.events |= EPOLLET;
 28     epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
 29     setnonblocking(fd);
 30 }
 31 
 32 void lt(epoll_event* events, int number, int epollfd, int listenfd) {
 33     char buf[BUFFER_SIZE];
 34     for(int i = 0; i < number; i ++) {
 35         int sockfd = events[i].data.fd;
 36         if(sockfd == listenfd) {
 37             struct sockaddr_in client_address;
 38             socklen_t client_addrlength = sizeof(client_address);
 39             int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
 40             addfd(epollfd, connfd, false);
 41         }
 42         else if(events[i].events & EPOLLIN) {
 43             printf("event trigger once\\n");
 44             memset(buf, 0, sizeof(buf));
 45             int ret = recv(sockfd, buf, BUFFER_SIZE, 0);
 46             if(ret <= 0) {
 47                 close(sockfd);
 48                 continue;
 49             }
 50             printf("get %d bytes of content: %s\\n", ret, buf);
 51         }
 52         else printf("something else happened\\n");
 53     }
 54 }
 55 
 56 void et(epoll_event* events, int number, int epollfd, int listenfd) {
 57     char buf[BUFFER_SIZE];
 58     for(int i = 0; i < number; i ++) {
 59         int sockfd = events[i].data.fd;
 60         if(sockfd == listenfd) {
 61             struct sockaddr_in client_address;
 62             socklen_t client_addrlength = sizeof(client_address);
 63             int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
 64             addfd(epollfd, connfd, true);
 65         }
 66         else if(events[i].events & EPOLLIN) {
 67             //这段代码不会被重复触发,所以我们循环读取
 68             printf("event trigger once\\n");
 69             while(1) {
 70                 memset(buf, 0, sizeof(buf));
 71                 int ret = recv(sockfd, buf, BUFFER_SIZE, 0);
 72                 if(ret < 0) {
 73                     //非阻塞模式的I/O,当下面的条件成立表示数据已经全部取走
 74                     if((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
 75                         printf("read later\\n");
 76                         break;
 77                     }
 78                     close(sockfd);
 79                     break;
 80                 }
 81                 else if(ret == 0) close(sockfd);
 82                 else printf("get %d bytes of content: %s\\n", ret, buf);
 83             }
 84         }
 85         else printf("something else happened\\n");
 86     }
 87 }
 88 
 89 int main(int argc, char** argv) {
 90     if(argc <= 2) {
 91         printf("usage: %s ip_address port_number\\n", basename(argv[0]));
 92         return 1;
 93     }
 94 
 95     const char* ip = argv[1];
 96     int port = atoi(argv[2]);
 97 
 98     int ret = 0;
 99     struct sockaddr_in address;
100     bzero(&address, sizeof(address));
101     address.sin_family = AF_INET;
102     address.sin_port = htons(port);
103     inet_pton(AF_INET, ip, &address.sin_addr);
104 
105     int listenfd = socket(AF_INET, SOCK_STREAM, 0);
106     assert(listenfd >= 0);
107 
108     ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
109     assert(ret != -1);
110 
111     ret = listen(listenfd, 5);
112     assert(ret != -1);
113 
114     epoll_event events[MAX_EVENT_NUMBER];
115     int epollfd = epoll_create(5);
116     assert(epollfd != -1);
117     addfd(epollfd, listenfd, true);
118 
119     while(1) {
120         int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
121         if(ret < 0) {
122             printf("epoll failure\\n");
123             break;
124         }
125         //lt(events, ret, epollfd, listenfd);
126         et(events, ret, epollfd, listenfd);
127     }
128     close(listenfd);
129     return 0;
130 }

技术分享图片

技术分享图片

  在上面的实验中,同样发送25个字符,第一个LT工作模式下epoll一共向应用程序通知了三次,而第二种的ET工作模式仅仅通知一次。

  即使我们使用ET模式,那一个socket上的事件也有可能被触发多次,比如一个线程读取socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读,此时另外一个线程来读取这些数据,于是就出现了两个线程同时操作一个socket的情况。而我们期望一个socket连接在任意时刻都只被一个线程处理,这一点我们可以用EPOLLONESHOT事件实现。对于注册了EPOLLONESHOT的文件描述符,操作系统最多触发其上注册的一个可读、可写或异常事件,且只触发一次,除非我们使用epoll_ctl重新注册,这样当一个线程在处理某个socket时,其他线程是不可能有机会操作这个socket的。所以当这个线程操作完毕,应该马上注册EPOLLONESHOT事件以确保其可以再次被触发EPOLLIN事件。用一段代码来看一下:

  1 /*************************************************************************
  2     > File Name: 9-4.cpp
  3     > Author: Torrance_ZHANG
  4     > Mail: [email protected]
  5     > Created Time: Sun 04 Feb 2018 12:24:13 AM PST
  6  ************************************************************************/
  7 
  8 #include"head.h"
  9 using namespace std;
 10 
 11 #define MAX_EVENT_NUMBER 1024
 12 #define BUFFER_SIZE 1024
 13 struct fds {
 14     int epollfd;
 15     int sockfd;
 16 };
 17 
 18 int setnonblocking(int fd) {
 19     int old_option = fcntl(fd, F_GETFL);
 20     int new_option = old_option | O_NONBLOCK;
 21     fcntl(fd, F_SETFL, new_option);
 22     return old_option;
 23 }
 24 
 25 void addfd(int epollfd, int fd, bool enable_et) {
 26     epoll_event event;
 27     event.data.fd = fd;
 28     event.events = EPOLLIN | EPOLLET;
 29     if(enable_et) event.events |= EPOLLONESHOT;
 30     epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
 31     setnonblocking(fd);
 32 }
 33 
 34 void reset_oneshot(int epollfd, int fd) {
 35     epoll_event event;
 36     event.data.fd = fd;
 37     event.events = EPOLLIN | EPOLLONESHOT | EPOLLET;
 38     epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
 39 }
 40 
 41 void* worker(void* arg) {
 42     int sockfd = ((fds*)arg)->sockfd;
 43     int epollfd = ((fds*)arg)->epollfd;
 44     printf("start new thread to receive data on fd: %d\\n", sockfd);
 45     char buf[BUFFER_SIZE];
 46     memset(buf, 0, sizeof(buf));
 47     while(1) {
 48         int ret = recv(sockfd, buf, BUFFER_SIZE, 0);
 49         if(ret == 0) {
 50             close(sockfd);
 51             printf("foreiner closed the connection\\n");
 52             break;
 53         }
 54         else if(ret < 0) {
 55             if(errno == EAGAIN) {
 56                 reset_oneshot(epollfd, sockfd);
 57                 printf("read later\\n");
 58                 break;
 59             }
 60         }
 61         else {
 62             printf("get content: %s\\n", buf);
 63             memset(buf, 0, sizeof(buf));
 64             sleep(5);
 65         }
 66     }
 67     printf("end thread receiving data on fd: %d\\n", sockfd);
 68 }
 69 
 70 int main(int argc, char** argv) {
 71     if(argc <= 2) {
 72         printf("usage: %s ip_address port_number\\n", basename(argv[0]));
 73         return 1;
 74     }
 75 
 76     const char* ip = argv[1];
 77     int port = atoi(argv[2]);
 78 
 79     int ret = 0;
 80     struct sockaddr_in address;
 81     bzero(&address, sizeof(address));
 82     address.sin_family = AF_INET;
 83     address.sin_port = htons(port);
 84     inet_pton(AF_INET, ip, &address.sin_addr);
 85 
 86     int listenfd = socket(AF_INET, SOCK_STREAM, 0);
 87     assert(listenfd >= 0);
 88 
 89     ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
 90     assert(ret != -1);
 91 
 92     ret = listen(listenfd, 5);
 93     assert(ret != -1);
 94 
 95     epoll_event events[MAX_EVENT_NUMBER];
 96     int epollfd = epoll_create(5);
 97     assert(epollfd != -1);
 98     addfd(epollfd, listenfd, false);
 99     
100     while(1) {
101         int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
102         if(ret < 0) {
103             printf("epoll failure\\n");
104             break;
105         }
106         for(int i = 0; i < ret; i ++) {
107             int sockfd = events[i].data.fd;
108             if(sockfd == listenfd) {
109                 struct sockaddr_in client_address;
110                 socklen_t client_addrlength = sizeof(client_address);
111                 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
112                 addfd(epollfd, connfd, true);
113             }
114             else if(events[i].events & EPOLLIN) {
115                 pthread_t thread;
116                 fds fds_for_new_worker;
117                 fds_for_new_worker.epollfd = epollfd;
118                 fds_for_new_worker.sockfd = sockfd;
119                 pthread_create(&thread, NULL, worker, (void*)&fds_for_new_worker);
120             }
121             else printf("something else happened\\n");
122         }
123     }
124     close(listenfd);
125     return 0;
126 }

技术分享图片

  代码中我们用sleep(5)来模拟数据处理的过程,当数据在处理时,我们又一次发送了数据,而此时服务器并没有调用另外一个线程继续工作,而是由原线程处理,我们看到两次数据都是由5号线程处理的。这样保证了连接的完整性,从而避免了很多可能的竞态条件。

  具体分析了三种I/O复用模式,我们来对比一下他们的异同与优缺点。这三个函数都能在一定时间内监听一定数量的文件描述符,若一个或多个文件描述符有时间返回时函数返回就绪的文件描述符的个数。select的参数类型fd_set没有把文件描述符和事件绑定,因此我们需要提供这三个集合,而且select也只能监听这三种事件,另一方面内核对fd_set集合会在线修改,所以应用程序下次调用select需要重置三个集合。而poll将所有事件放在了一起,使编程接口更加简洁,且内核每次操作的是pollfd中的revents成员,events并未改变。最后epoll则完全不同,它在内核维护一个内核事件表,每次epoll_wait操作都是直接从该内核事件表中取得用户注册的事件,而无需从用户空间重复读入这些事件。

  另外,poll和epoll可以监听的最大文件描述符数目都能达到系统允许的最大值,即65535,而select由于fd_set的本质是一个整形数组,每一位代表一个文件描述符,所以其能监听的最大数量有限制。

  从效率和原理上来讲,select和poll都工作在低效的LT模式,而且采用轮询的方式,而epoll则不同,他支持高效的ET模式,而且可以支持EPOLLONESHOT事件以进一步减少可读可写和异常事件的触发次数,不仅如此,它采用了回调的方式,内核检测到就绪的文件描述符就触发回调函数,回调函数将该文件描述符上的对应事件插入内核的就绪队列,最后将就绪队列中的内容拷贝到用户空间。但是当活动连接较多时,epoll的效率未必比select和poll高,所以epoll_wait适用于连接数量多但是活动连接少的情况。

  当我们对一个非阻塞的socket调用一个connect,而连接又没有被马上建立,这时,我们可以调用select、poll等函数来监听socket上的可写事件,当函数返回时利用getsockopt读取错误码并清除错误,如果错误码是0则表示连接成功建立,否则失败。我们来看一下这种情况怎么实现:

 1 /*************************************************************************
 2     > File Name: 9-5.cpp
 3     > Author: Torrance_ZHANG
 4     > Mail: [email protected]
 5     > Created Time: Sun 04 Feb 2018 07:47:09 PM PST
 6  ************************************************************************/
 7 
 8 #include"head.h"
 9 using namespace std;
10 
11 #define BUFFER_SIZE 1024
12 
13 int setnonblocking(int fd) {
14     int old_option = fcntl(fd, F_GETFL);
15     int new_option = old_option | O_NONBLOCK;
16     fcntl(fd, F_SETFL, new_option);
17     return old_option;
18 }
19 
20 //非阻塞连接,如果函数成功则返回连接的socket,不成功返回-1
21 int unblock_connect(const char* ip, int port, int time) {
22     int ret = 0;
23     struct sockaddr_in address;
24     bzero(&address, sizeof(address));
25     address.sin_family = AF_INET;
26     inet_pton(AF_INET, ip, &address.sin_addr);
27     address.sin_port = htons(port);
28 
29     int sockfd = socket(AF_INET, SOCK_STREAM, 0);
30     int fdopt = setnonblocking(sockfd);
31     ret = connect(sockfd, (struct sockaddr*)&address, sizeof(address));
32     if(ret == 0) {
33         printf("connect with server immediately\\n");
34         fcntl(sockfd, F_SETFL, fdopt);
35         return sockfd;
36     }
37     else if(errno != EINPROGRESS) {
38         printf("unblock connect not support\\n");
39         return -1;
40     }
41     fd_set readfds;
42     fd_set writefds;
43     struct timeval timeout;
44 
45     FD_ZERO(&readfds);
46     FD_ZERO(&writefds);
47 
48     timeout.tv_sec = time;
49     timeout.tv_usec = 0;
50 
51     ret = select(sockfd + 1, NULL, &writefds, NULL, &timeout);
52     if(ret <= 0) {
53         printf("connection time out\\n");
54         close(sockfd);
55         return -1;
56     }
57     
58     if(!FD_ISSET(sockfd, &writefds)) {
59         printf("no events on sockfd found\\n");
60         close(sockfd);
61         return -1;
62     }
63 
64     int error = 0;
65     socklen_t length = sizeof(error);
66     if(getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &length) < 0) {
67         printf("get socket option failed\\n");
68         close(sockfd);
69         return -1;
70     }
71 
72     if(errno != 0) {
73         printf("connection failed after select with the error: %d\\n", error);
74         close(sockfd);
75         return -1;
76     }
77 
78     printf("connection ready after select with the socket: %d\\n", sockfd);
79     fcntl(sockfd, F_SETFL, fdopt);
80     return sockfd;
81 }
82 
83 int main(int argc, char** argv) {
84     if(argc <= 2) {
85         printf("usage: %s ip_address port_number\\n", basename(argv[0]));
86         return 1;
87     }
88     const char* ip = argv[1];
89     int port = atoi(argv[2]);
90     int sockfd = unblock_connect(ip, port, 10);
91     if(sockfd < 0) return 1;
92     close(sockfd);
93     return 0;
94 }

  很多服务器要一边处理网络连接一边处理用户输入,比如聊天室程序,这样的就可以用I/O复用来实现,我们以一个poll实现的聊天室程序来举例说明一下:

  首先看下客户端:

 1 /*************************************************************************
 2     > File Name: 9-6.cpp
 3     > Author: Torrance_ZHANG
 4     > Mail: [email protected]
 5     > Created Time: Sun 04 Feb 2018 08:27:40 PM PST
 6  ************************************************************************/
 7 
 8 #define _GNU_SOURCE 1
 9 #include"head.h"
10 using namespace std;
11 
12 #define BUFFER_SIZE 64
13 
14 int main(int argc, char** argv) {
15     if(argc <= 2) {
16         printf("usage: %s ip_address port_number\\n", basename(argv[0]));
17         return 1;
18     }
19     const char* ip = argv[1];
20     int port = atoi(argv[2]);
21 
22     struct sockaddr_in server_address;
23     bzero(&server_address, sizeof(server_address));
24     server_address.sin_family = AF_INET;
25     server_address.sin_port = htons(port);
26     inet_pton(AF_INET, ip, &server_address.sin_addr);
27 
28     int sockfd = socket(AF_INET, SOCK_STREAM, 0);
29     assert(sockfd >= 0);
30     if(connect(sockfd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0) {
31         printf("connection failed\\n");
32         close(sockfd);
33         return 1;
34     }
35     pollfd fds[2];
36     fds[0].fd = 0;
37     fds[0].events = POLLIN;
38     fds[0].revents = 0;
39     fds[1].fd = sockfd;
40     fds[1].events = POLLIN | POLLRDHUP;
41     fds[1].revents = 0;
42     char read_buf[BUFFER_SIZE];
43     int pipefd[2];
44     int ret = pipe(pipefd);
45     assert(ret != -1);
46 
47     while(1) {
48         ret = poll(fds, 2, -1);
49         if(ret < 0) {
50             printf("poll failure\\n");
51             break;
52         }
53         
54         if(fds[1].revents & POLLRDHUP) {
55             printf("server close the connection\\n");
56             break;
57         }
58         else if(fds[1].revents & POLLIN) {
59             memset(read_buf, 0, sizeof(read_buf));
60             recv(fds[1].fd, read_buf, BUFFER_SIZE, 0);
61             printf("%s\\n", read_buf);
62         }
63 
64         if(fds[0].revents & POLLIN) {
65             ret = splice(0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
66             ret = splice(pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
67         }
68     }
69     close(sockfd);
70     return 0;
71 }

  再看下服务器端:

  1 /*************************************************************************
  2     > File Name: 9-7.cpp
  3     > Author: Torrance_ZHANG
  4     > Mail: [email protected]
  5     > Created Time: Sun 04 Feb 2018 10:35:16 PM PST
  6  ************************************************************************/
  7 
  8 #define _GNU_SOURCE 1
  9 #include"head.h"
 10 using namespace std;
 11 
 12 #define USER_LIMIT 5
 13 #define BUFFER_SIZE 64
 14 #define FD_LIMIT 65535
 15 
 16 struct client_data {
 17     sockaddr_in address;
 18     char* write_buf;
 19     char buf[BUFFER_SIZE];
 20 };
 21 
 22 int setnonblocking(int fd) {
 23     int old_option = fcntl(fd, F_GETFL);
 24     int new_option = old_option | O_NONBLOCK;
 25     fcntl(fd, F_SETFL, new_option);
 26     return old_option;
 27 }
 28 
 29 int main(int argc, char** argv) {
 30     if(argc <= 2) {
 31         printf("usage: %s ip_address port_number\\n", basename(argv[0]));
 32         return 1;
 33     }
 34     const char* ip = argv[1];
 35     int port = atoi(argv[2]);
 36 
 37     int ret = 0;
 38     struct sockaddr_in address;
 39     bzero(&address, sizeof(address));
 40     address.sin_family = AF_INET;
 41     address.sin_port = htons(port);
 42     inet_pton(AF_INET, ip, &address.sin_addr);
 43 
 44     int listenfd = socket(AF_INET, SOCK_STREAM, 0);
 45     assert(listenfd >= 0);
 46     ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
 47     assert(ret != -1);
 48 
 49     ret = listen(listenfd, 5);
 50     assert(ret != -1);
 51 
 52     client_data* users = new client_data[FD_LIMIT];
 53     pollfd fds[USER_LIMIT + 1];
 54     int user_counter = 0;
 55     for(int i = 1; i <= USER_LIMIT; i ++) {
 56         fds[i].fd = -1;
 57         fds[i].events = 0;
 58     }
 59     fds[0].fd = listenfd;
 60     fds[0].events = POLLIN | POLLERR;
 61     fds[0].revents = 0;
 62 
 63     while(1) {
 64         ret = poll(fds, user_counter + 1, -1);
 65         if(ret < 0) {
 66             printf("poll failure\\n");
 67             break;
 68         }
 69 
 70         for(int i = 0; i < user_counter + 1;i ++) {
 71             if((fds[i].fd == listenfd) && (fds[i].revents & POLLIN)) {
 72                 struct sockaddr_in client_address;
 73                 socklen_t client_addrlength = sizeof(client_address);
 74                 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
 75 
 76                 if(connfd < 0) {
 77                     printf("errno is: %d\\n", errno);
 78                     continue;
 79                 }
 80 
 81                 if(user_counter >= USER_LIMIT) {
 82                     const char* info = "too many users\\n";
 83                     printf("%s", info);
 84                     send(connfd, info, sizeof(info), 0);
 85                     close(connfd);
 86                     continue;
 87                 }
 88 
 89                 user_counter ++;
 90                 users[connfd].address = client_address;
 91                 setnonblocking(connfd);
 92                 fds[user_counter].fd = connfd;
 93                 fds[user_counter].events = POLLIN | POLLERR | POLLRDHUP;
 94                 fds[user_counter].revents = 0;
 95                 printf("comes a new user, now have %d users\\n", user_counter);
 96             }
 97             else if(fds[i].revents & POLLERR) {
 98                 printf("get an error from %d\\n", fds[i].fd);
 99                 char errors[100];
100                 memset(errors, 0, sizeof(errors));
101                 socklen_t length = sizeof(errors);
102                 if(getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length) < 0) {
103                     printf("get socket option failed\\n");
104                 }
105                 continue;
106             }
107             else if(fds[i].revents &POLLRDHUP) {
108                 users[fds[i].fd] = users[fds[user_counter].fd];
109                 close(fds[i].fd);
110                 fds[i] = fds[user_counter];
111                 user_counter --;
112                 printf("a client left\\n");
113             }
114             else if(fds[i].revents & POLLIN) {
115                 int connfd = fds[i].fd;
116                 memset(users[connfd].buf, \\0, BUFFER_SIZE);
117                 ret = recv(connfd, users[connfd].buf, BUFFER_SIZE, 0);
118                 if(ret < 0) {
119                     if(errno != EAGAIN) {
120                         close(connfd);
121                         users[fds[i].fd] = users[fds[user_counter].fd];
122                         fds[i] = fds[user_counter];
123                         user_counter --;
124                         i --;
125                     }
126                 }
127                 else if(ret == 0) {}
128                 else {
129                     for(int j = 1; j <= user_counter; j ++) {
130                         if(fds[j].fd == connfd) continue;
131                         fds[j].events |= ~POLLIN;
132                         fds[j].events |= POLLOUT;
133                         users[fds[j].fd].write_buf = users[connfd].buf;
134                     }
135                 }
136             }
137             else if(fds[i].revents & POLLOUT) {
138                 int connfd = fds[i].fd;
139                 if(!users[connfd].write_buf) continue;
140                 ret = send(connfd, users[connfd].write_buf, strlen(users[connfd].write_buf), 0);
141                 users[connfd].write_buf = NULL;
142 
143                 fds[i].events |= ~POLLOUT;
144                 fds[i].events |= POLLIN;
145             }
146         }
147     }
148     delete []users;
149     close(listenfd);
150     return 0;
151 }

技术分享图片

  对于这个程序,其难点只有一个,就是当用户退出的时候,我们将整个用户列表的最后一项移动到退出用户的这一项,可以保证前user_counter项都是有客户的。但是对于执行完POLLIN事件后,我们取消注册了POLLIN,转而注册了POLLOUT,在我的理解下可能有些问题。我们考虑一个极端情况,如果取消注册了POLLIN,注册POLLOUT后,主线程转而处理下一个poll事件,但此时刚刚处理过的文件描述符上由发来了数据,而由于此时POLLIN事件已经被取消注册,那么就不会在下一轮轮询中被触发POLLIN。 我将代码中下一步的else if改为了if,这样判断完POLLIN事件后会在判断一次POLLOUT事件,经测试没有问题。但是仍有待商榷,暂留疑问待日后解决。

  最后,I/O复用还可以用于同时监听TCP和UDP服务:

  1 /*************************************************************************
  2     > File Name: 9-8.cpp
  3     > Author: Torrance_ZHANG
  4     > Mail: [email protected]
  5     > Created Time: Tue 06 Feb 2018 03:23:19 AM PST
  6  ************************************************************************/
  7 
  8 #include"head.h"
  9 using namespace std;
 10 
 11 #define MAX_EVENT_NUMBER 1024
 12 #define TCP_BUFFER_SIZE 512
 13 #define UDP_BUFFER_SIZE 1024
 14 
 15 int setnonblocking(int fd) {
 16     int old_option = fcntl(fd, F_GETFL);
 17     int new_option = old_option | O_NONBLOCK;
 18     fcntl(fd, F_SETFL, new_option);
 19     return old_option;
 20 }
 21 
 22 void addfd(int epollfd, int fd) {
 23     epoll_event event;
 24     event.data.fd = fd;
 25     event.events = EPOLLIN | EPOLLET;
 26     epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
 27     setnonblocking(fd);
 28 }
 29 
 30 int main(int argc, char** argv) {
 31     if(argc <= 2) {
 32         printf("usage: %s ip_address port_number\\n", basename(argv[0]));
 33         return 1;
 34     }
 35     const char* ip = argv[1];
 36     int port = atoi(argv[2]);
 37 
 38     int ret = 0;
 39     struct sockaddr_in address;
 40     bzero(&address, sizeof(address));
 41     address.sin_family = AF_INET;
 42     inet_pton(AF_INET, ip, &address.sin_addr);
 43     address.sin_port = htons(port);
 44 
 45     int listenfd = socket(AF_INET, SOCK_STREAM, 0);
 46     assert(listenfd >= 0);
 47 
 48     ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
 49     assert(ret != -1);
 50 
 51     ret = listen(listenfd, 5);
 52     assert(ret != -1);
 53 
 54     bzero(&address, sizeof(address));
 55     address.sin_family = AF_INET;
 56     inet_pton(AF_INET, ip, &address.sin_addr);
 57     address.sin_port = htons(port);
 58     int udpfd = socket(AF_INET, SOCK_DGRAM, 0);
 59     assert(udpfd >= 0);
 60 
 61     ret = bind(udpfd, (struct sockaddr*)&address, sizeof(address));
 62     assert(ret != -1);
 63     epoll_event events[MAX_EVENT_NUMBER];
 64     int epollfd = epoll_create(5);
 65     assert(epollfd != -1);
 66 
 67     addfd(epollfd, listenfd);
 68     addfd(epollfd, udpfd);
 69 
 70     while(1) {
 71         int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
 72         if(number < 0) {
 73             printf("epoll failure\\n");
 74             break;
 75         }
 76         for(int i = 0; i < number; i ++) {
 77             int sockfd = events[i].data.fd;
 78             if(sockfd == listenfd) {
 79                 struct sockaddr_in client_address;
 80                 socklen_t client_addrlength = sizeof(client_address);
 81                 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
 82                 addfd(epollfd, connfd);
 83             }
 84             else if(sockfd == udpfd) {
 85                 char buf[UDP_BUFFER_SIZE];
 86                 memset(buf, 0, UDP_BUFFER_SIZE);
 87                 struct sockaddr_in client_address;
 88                 socklen_t client_addrlength = sizeof(client_address);
 89 
 90                 ret = recvfrom(udpfd, buf, UDP_BUFFER_SIZE, 0, (struct sockaddr*)&client_address, &client_addrlength);
 91                 if(ret > 0) {
 92                     sendto(udpfd, buf, UDP_BUFFER_SIZE, 0, (struct sockaddr*)&client_address, client_addrlength);
 93                 }
 94             }
 95             else if(events[i].events & EPOLLIN) {
 96                 char buf[TCP_BUFFER_SIZE];
 97                 while(1) {
 98                     memset(buf, 0, TCP_BUFFER_SIZE);
 99                     ret = recv(sockfd, buf, TCP_BUFFER_SIZE, 0);
100                     if(ret < 0) {
101                         if((errno == EAGAIN) || (errno == EWOULDBLOCK)) break;
102                         close(sockfd);
103                         break;
104                     }
105                     else if(ret == 0) {
106                         close(sockfd);
107                     }
108                     else send(sockfd, buf, ret, 0);
109                 }
110             }
111         }
112     }
113     close(listenfd);
114     return 0;
115 }

 

以上是关于《Linux高性能服务器编程》学习总结——I/O复用的主要内容,如果未能解决你的问题,请参考以下文章

《Linux高性能服务器编程》学习总结——高性能服务器程序框架

《Linux高性能服务器编程》学习总结——定时器

linux高性能服务器编程 --I/O复用

Linux高性能服务编程(I/O复用)

Linux网络编程基于TCP流 I/O多路转接(poll) 的高性能http服务器

Linux高性能服务器编程I/O复用的高级应用