异步I/O事件处理

Posted sip-inaction

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了异步I/O事件处理相关的知识,希望对你有一定的参考价值。

异步I/O

  所谓异步I/O,是指以事件触发的机制来对I/O操作进行处理;与多进程和多线程技术相比,异步I/O技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而减少了系统的开销。

 

 1 /**
 2   client.c
 3 */
 4 #include <stdio.h>
 5 #include <unistd.h>
 6 #include <string.h>
 7 #include <strings.h>
 8 #include <sys/types.h>
 9 #include <sys/socket.h>
10 #include <netinet/in.h>
11 #include <arpa/inet.h>
12 
13 #define PORT 8111
14 #define MSG_LEN 1024
15 
16 int main(int argc, char* argv[])
17 {
18     if (argc != 2) {
19         printf("usage: ./%s <ipaddress>
", argv[0]);
20         return 1;
21     }
22     
23     int ret = -1;
24     int socket_fd;
25     socklen_t addr_len;
26     struct sockaddr_in serv_addr;
27     char send_buf[MSG_LEN] = { 0 };
28     char recv_buf[MSG_LEN] = { 0 };
29 
30     socket_fd = socket(AF_INET, SOCK_STREAM, 0);
31     if (socket_fd < 0) {
32         printf("create socket fd failed.
");
33         _exit(-1);
34     }
35 
36     bzero(&serv_addr, sizeof(serv_addr));
37     serv_addr.sin_family = AF_INET;
38     serv_addr.sin_port = htons(PORT);
39     serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
40     addr_len = sizeof(struct sockaddr);
41     ret = connect(socket_fd, (struct sockaddr *)&serv_addr, addr_len);
42     if (ret < 0) {
43         printf("connect to server[%s:%d] failed.
", argv[1], PORT);
44         _exit(-1);
45     }
46 
47     while (1) {
48         memset(send_buf, 0, sizeof(send_buf));
49         memset(recv_buf, 0, sizeof(recv_buf));
50         gets(send_buf);
51         
52         if (strncmp(send_buf, "quit", 4) == 0)
53             break;
54 
55         ret = send(socket_fd, (void *)send_buf, strlen(send_buf), 0);
56         if (ret < 0) {
57             printf("send msg[%s] failed.
", send_buf);
58             break;
59         }
60         
61         ret = recv(socket_fd, (void *)recv_buf, sizeof(recv_buf), 0);
62         if (ret < 0) {
63             printf("receive zero msg.
");
64             continue;
65         }
66 
67         recv_buf[ret] = ;
68         printf("recv msg[%s]
", recv_buf);
69     }
70 
71     close(socket_fd);
72 
73     return 0;
74 }
75 
76 # gcc -std=c11 -g -o client client.c
77 # ./client <ipaddress>

 

1.fork

  1.每收到一个连接就创建一个子进程

  2.父进程负责接收连接

  3.通过fork创建子进程

 1 /**
 2   fork.c
 3 */
 4 
 5 #include <stdio.h>
 6 #include <unistd.h>
 7 #include <string.h>
 8 #include <strings.h>
 9 #include <sys/types.h>
10 #include <sys/socket.h>
11 #include <netinet/in.h>
12 
13 #define PORT 8111
14 #define MSG_LEN 1024
15 
16 int main(int argc, char* argv[])
17 {
18     int ret = -1;
19     int on = 1;
20     int backlog = 10;
21     pid_t pid;
22     int socket_fd, accept_fd;
23     struct sockaddr_in    local_addr, remote_addr;
24     char buff[MSG_LEN] = { 0 };
25 
26     socket_fd = socket(AF_INET, SOCK_STREAM, 0);
27     if (socket_fd < 0) {
28         printf("create socket fd failed.
");
29         _exit(-1);
30     }
31 
32     ret = setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
33     if (ret < 0) {
34         printf("set socket option failed.
");
35     }
36 
37     bzero(&local_addr, sizeof(local_addr));
38     local_addr.sin_family = AF_INET;
39     local_addr.sin_port = htons(PORT);
40     local_addr.sin_addr.s_addr = htonl(INADDR_ANY);
41     ret = bind(socket_fd, (struct sockaddr *)&local_addr, sizeof(struct sockaddr));
42     if (ret < 0) {
43         printf("bind port[%d] failed.
", PORT);
44         _exit(-1);
45     }
46 
47     ret = listen(socket_fd, backlog);
48     if (ret < 0) {
49         printf("listen socket[%d] port[%d] failed.
", socket_fd, PORT);
50         _exit(-1);
51     }
52     printf("listening on port[%d]...
", PORT);
53 
54     while (1) {
55         socklen_t addr_len = sizeof(struct sockaddr);
56         accept_fd = accept(socket_fd, (struct sockaddr *)&remote_addr, &addr_len);
57         printf("connection[%d] estasblished...
", accept_fd);
58 
59         pid_t pid = fork();
60         if (pid == 0) {
61             while (1) {
62                 memset((void *)buff, 0, sizeof(buff));
63                 ret = recv(accept_fd, (void *)buff, MSG_LEN, 0);
64                 if (ret == 0)
65                     break;
66 
67                 buff[ret] = ;
68                 printf("recv[%s]
", buff);
69                 send(accept_fd, (void *)buff, strlen(buff), 0);
70             }
71 
72             close(accept_fd);
73         } else {
74 
75         }
76     }
77 
78     if (pid != 0)
79         close(socket_fd);
80 
81     return 0;
82 }
83 
84 # gcc -std=c11 -g -o fork fork.c
85 # ./fork

   存在问题:

  1.资源长期被占用

  2.创建子进程耗时

 

2.select

  1.遍历文件描述符集中的所有描述符,找出有变化的描述符

  2.对于侦听的socket和数据处理的socket要区别对待

  3.socket必须设置为非阻塞的方式工作

  4.FD_ZERO()/FD_SET()/FD_ISSET()/FD_CLR()

  5.int fcntl(int fd, int cmd, ... /* arg */ );  // flag fcntl(fd, F_SETFL/F_GETFL, flag);

  6. int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

  1 /**
  2   select.c 
  3 */
  4 
  5 /**
  6  * struct timeval {
  7  *     long tv_sec;
  8  *     long tv_usec;
  9  * }
 10  */
 11 
 12 #include <stdio.h>
 13 #include <errno.h>
 14 #include <unistd.h>
 15 #include <fcntl.h>
 16 #include <string.h>
 17 #include <strings.h>
 18 #include <sys/types.h>
 19 #include <sys/time.h>
 20 #include <sys/socket.h>
 21 #include <netinet/in.h>
 22 #include <sys/select.h>
 23 
 24 #define PORT 8111
 25 #define MSG_LEN 1024
 26 #define FD_SIZE 1024
 27 
 28 int main(int argc, char* argv[])
 29 {
 30     int ret = -1;
 31     int on = 1;
 32     int backlog = 10;
 33     int flags;
 34     int events = 0;
 35     int max_fd = -1;
 36     fd_set fd_sets;
 37     struct timeval time_out = {3, 0};
 38     int accept_fds[FD_SIZE] = { 0 };
 39     memset(accept_fds, -1, sizeof(accept_fds));
 40     int socket_fd;
 41     struct sockaddr_in    local_addr, remote_addr;
 42     char buff[MSG_LEN] = { 0 };
 43 
 44     socket_fd = socket(AF_INET, SOCK_STREAM, 0);
 45     if (socket_fd < 0) {
 46         printf("create socket fd failed.
");
 47         _exit(-1);
 48     }
 49 
 50     max_fd = socket_fd;
 51 
 52     flags = fcntl(socket_fd, F_GETFL, 0);
 53     fcntl(socket_fd, F_SETFL, flags | O_NONBLOCK);
 54 
 55     ret = setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
 56     if (ret < 0) {
 57         printf("set socket option failed.
");
 58     }
 59 
 60     bzero(&local_addr, sizeof(local_addr));
 61     local_addr.sin_family = AF_INET;
 62     local_addr.sin_port = htons(PORT);
 63     local_addr.sin_addr.s_addr = htonl(INADDR_ANY);
 64     ret = bind(socket_fd, (struct sockaddr *)&local_addr, sizeof(struct sockaddr));
 65     if (ret < 0) {
 66         printf("bind port[%d] failed.
", PORT);
 67         _exit(-1);
 68     }
 69 
 70     ret = listen(socket_fd, backlog);
 71     if (ret < 0) {
 72         printf("listen socket[%d] port[%d] failed.
", socket_fd, PORT);
 73         _exit(-1);
 74     }
 75     printf("listening on port[%d]...
", PORT);
 76 
 77     while (1) {
 78         FD_ZERO(&fd_sets);
 79         FD_SET(socket_fd, &fd_sets);
 80         max_fd = socket_fd;
 81         /*for (const int& accept_fd : accept_fds) {
 82             if (accept_fd == -1)
 83                 continue;
 84 
 85             if (accept_fd > max_fd)
 86                 max_fd = accept_fd;
 87 
 88             printf("accept fd:%d
", accept_fd);
 89             FD_SET(accept_fd, &fd_sets);
 90         }*/
 91         for (int i = 0; i < FD_SIZE; ++i) {
 92             if (accept_fds[i] == -1)
 93                 continue;
 94 
 95             if (accept_fds[i] > max_fd)
 96                 max_fd = accept_fds[i];
 97 
 98             FD_SET(accept_fds[i], &fd_sets);
 99         }
100 
101         time_out.tv_sec = 2;       // 2 seconds
102         time_out.tv_usec = 500000; // 500 micro second
103         events = select(max_fd + 1, &fd_sets, NULL, NULL, &time_out);
104         if (events < 0) {
105             printf("select[%d] errno[%d|%s]
", events, errno, strerror(errno));
106             for (int i = 0; i < FD_SIZE; ++i) {
107                 if (accept_fds[i] == -1)
108                     continue;
109 
110                 if (FD_ISSET(accept_fds[i], &fd_sets)) {
111                     char buf[MSG_LEN] = { 0 };
112                     int n = read(accept_fds[i], buf, MSG_LEN);
113                     if (n <= 0 && errno == EBADF) {  // errno important
114                         printf("n:%d], fd:%d], errno[%d|%s]
", n, accept_fds[i], errno, strerror(errno));
115                         close(accept_fds[i]);
116                         FD_CLR(accept_fds[i], &fd_sets);
117                         accept_fds[i] = -1;
118                     }
119                 }
120             }
121             continue;
122         } else if (events == 0) {
123             printf("select timeout
");
124             continue;
125         } else if (events > 0) {
126             if (FD_ISSET(socket_fd, &fd_sets)) {
127                 int curpos = -1;
128                 for (int i = 0; i < FD_SIZE; ++i) {
129                     if (accept_fds[i] == -1) {
130                         curpos = i;
131                         break;
132                     }
133                 }
134 
135                 if (curpos != -1) {
136                     socklen_t addr_len = sizeof(struct sockaddr);
137                     int accept_fd = accept(socket_fd, (struct sockaddr *)&remote_addr, &addr_len);
138                     printf("connection[%d] estasblished...
", accept_fd);
139                     
140                     flags = fcntl(accept_fd, F_GETFL, 0);
141                     fcntl(accept_fd, F_SETFL, flags | O_NONBLOCK);
142                     accept_fds[curpos] = accept_fd;
143                 } else {
144                     printf("fd slots fills.
");
145                 }
146             }
147 
148             for (int i = 0; i < FD_SIZE; ++i) {
149                 if (accept_fds[i] != -1 && FD_ISSET(accept_fds[i], &fd_sets)) {
150                     memset(buff, 0, sizeof(buff));
151                     ret = recv(accept_fds[i], (void *)buff, sizeof(buff), 0);
152                     if (ret == 0) {
153                         close(accept_fds[i]);
154                         continue;
155                     }
156 
157                     printf("recv[%s]
", buff);
158                     send(accept_fds[i], (void *)buff, strlen(buff), 0);
159                 }
160             }
161         }
162     }
163 
164     close(socket_fd);
165 
166     return 0;
167 }
168 
169 # gcc -std=c11 -g -o select select.c
170 # ./select

 

3.poll

  1.#include <poll.h>

  2.int poll(struct pollfd *fds, nfds_t nfds, int timeout);

  3.POLLIN/POLLOUT/POLLERR

  1 /**
  2   poll.c
  3 */
  4 
  5 /**
  6  * struct pollfd {
  7  *     int fd;
  8  *     short events;
  9  *     short revents;
 10  * };
 11  */
 12 
 13 #include <stdio.h>
 14 #include <unistd.h>
 15 #include <string.h>
 16 #include <strings.h>
 17 #include <sys/types.h>
 18 #include <sys/socket.h>
 19 #include <netinet/in.h>
 20 #include <poll.h>
 21 #include <errno.h>
 22 #include <limits.h>
 23 
 24 #define PORT 8111
 25 #define MSG_LEN 1024
 26 #define OPEN_MAX 1024
 27 #define INFTIM -1
 28 
 29 int main(int argc, char* argv[])
 30 {
 31     int ret = -1;
 32     int on = 1;
 33     int backlog = 10;
 34     int maxi = -1;
 35     int socket_fd, accept_fd;
 36     struct sockaddr_in    local_addr, remote_addr;
 37     char buff[MSG_LEN] = { 0 };
 38     struct pollfd client[OPEN_MAX];
 39 
 40     socket_fd = socket(AF_INET, SOCK_STREAM, 0);
 41     if (socket_fd < 0) {
 42         printf("create socket fd failed.
");
 43         _exit(-1);
 44     }
 45 
 46     ret = setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
 47     if (ret < 0) {
 48         printf("set socket option failed.
");
 49     }
 50 
 51     bzero(&local_addr, sizeof(local_addr));
 52     local_addr.sin_family = AF_INET;
 53     local_addr.sin_port = htons(PORT);
 54     local_addr.sin_addr.s_addr = htonl(INADDR_ANY);
 55     ret = bind(socket_fd, (struct sockaddr *)&local_addr, sizeof(struct sockaddr));
 56     if (ret < 0) {
 57         printf("bind port[%d] failed.
", PORT);
 58         _exit(-1);
 59     }
 60 
 61     ret = listen(socket_fd, backlog);
 62     if (ret < 0) {
 63         printf("listen socket[%d] port[%d] failed.
", socket_fd, PORT);
 64         _exit(-1);
 65     }
 66     printf("listening on port[%d]...
", PORT);
 67 
 68     client[0].fd = socket_fd;
 69     client[0].events = POLLIN;
 70     for (int i = 1; i < OPEN_MAX; ++i) {
 71         client[i].fd = -1;
 72     }
 73     maxi = 0;
 74 
 75     while (1) {
 76         ret = poll(client, maxi + 1, INFTIM);
 77         if (client[0].revents & POLLIN) {
 78             socklen_t addr_len = sizeof(struct sockaddr);
 79             accept_fd = accept(socket_fd, (struct sockaddr *)&remote_addr, &addr_len);
 80             int i = -1;
 81             for (i = 1; i < OPEN_MAX; ++i) {
 82                 if (client[i].fd < 0) {
 83                     client[i].fd = accept_fd;
 84                     break;
 85                 }
 86             }
 87 
 88             if (i == OPEN_MAX) {
 89                 printf("connection[%d] fill
", OPEN_MAX);
 90                 continue;
 91             }
 92 
 93             client[i].events = POLLIN;
 94             if (i > maxi) 
 95                 maxi = i;
 96                 
 97             printf("connection[%d] estasblished...
", accept_fd);
 98         }
 99 
100         for (int i = 1; i < OPEN_MAX; ++i) {
101             if (client[i].fd < 0)
102                 continue;
103 
104             if (client[i].revents & (POLLIN | POLLERR)) {
105                 ret = read(client[i].fd, buff, sizeof(buff));
106                 if (ret == 0 || (ret < 0 && errno == EBADF)) {
107                     close(client[i].fd);
108                     client[i].fd = -1;
109                 } else {
110                     buff[ret] = ;
111                     printf("recv[%s]
", buff);
112                     write(client[i].fd, (void *)buff, strlen(buff));
113                 }
114             }
115         }
116     }
117 
118     close(socket_fd);
119 
120     return 0;
121 }
122 
123 # gcc -std=c11 -g -o poll poll.c
124 # ./poll

 

  select和poll模型都存在的一个问题就是需要剔除异常的socket文件描述符,否则程序会不停报错。

以上是关于异步I/O事件处理的主要内容,如果未能解决你的问题,请参考以下文章

010 异步I/O处理 002

010 异步I/O处理 003

Node.js中的异步I/O是如何进行的?

请教nginx是怎么处理静态文件的

node.js的异步I/O事件驱动单线程

如何优雅的处理Nodejs中的异步回调