Linux高性能server编程——I/O复用
Posted yangykaifa
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux高性能server编程——I/O复用相关的知识,希望对你有一定的参考价值。
IO复用
I/O复用使得程序能同一时候监听多个文件描写叙述符。通常网络程序在下列情况下须要使用I/O复用技术:
-
client程序要同一时候处理多个socket
-
client程序要同一时候处理用户输入和网络连接
-
TCPserver要同一时候处理监听socket和连接socket,这是I/O复用使用最多的场合
-
server要同一时候处理TCP请求和UDP请求。比方本章将要讨论的会社server
-
server要同一时候监听多个port。或者处理多种服务。
I/O复用尽管能同一时候监听多个文件描写叙述符,但它本身是堵塞的。而且当多个文件描写叙述符同一时候就绪时,假设不採用额外措施,程序就仅仅能按顺序依次处理当中的每个文件描写叙述符,这使得server程序看起来像是串行工作。
假设要实现并发,仅仅能使用多进程或多线程等变成手段。
select系统复用
select系统调用的用途是:在一段指定时间内。监听用户感兴趣的文件描写叙述符上的可读可写和异常等事件。
#include <sys/select.h>
int select(int nfds, fd_set *readfds,fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
-
nfds參数指定被监听的文件描写叙述符的总数。
通常被设置为select监听的全部文件描写叙述符中的最大值加1,由于文件描写叙述符是从0開始计数的
-
readfds, writefds和exceptfds參数分别指向可读、可写和异常等事件相应的文件描写叙述符集合。
fd_set结构体仅包括一个整形数组。高数组的每一个元素的每一位标记一个文件描写叙述符。
可用例如以下宏来訪问fd_set结构体中的位:
voidFD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
voidFD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set*set);
-
timeout參数用来设置select函数的超时时间。它是一个timeval指针。timeval结构体定义例如以下:
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};
假设给timeout传递NULL。则select将一直堵塞,直到某个文件描写叙述符就绪。
select成功时返回就绪文件描写叙述符的总数,假设在超时时间内没有不论什么文件描写叙述符就绪返回0,失败返回-1,并设置errno;假设select在等待期间收到信号,则select马上返回-1,并设置errno为EINTR。
poll系统调用
poll系统调用和select类似,也是在指定时间内伦旭一定数量的文件描写叙述符。以測试当中是否有就绪。
poll原型例如以下:
#include<poll.h>
int poll(structpollfd *fds, nfds_t nfds, int timeout);
1)fds參数是一个pollfd结构类型的数组,它指定所以我们感兴趣的文件描写叙述符上发生的刻度、可写和异常等时间。其结构定义例如以下:
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
当中fd成员指定文件描写叙述符;events成员告诉poll监听f上的那些时间,它是一系列时间的按位或;revents成员则由内核改动,以通知应用程序fd上实际发生了哪些事件。
2)nfds參数指定被监听事件集合的大小。其类型nfds_t定义例如以下:
typedef unsignedlong int nfds_t;
-
timeout參数指定poll的超时时间。单位是毫秒。当timeout为-1时。poll调用将永远堵塞,直到某个事件发生;当为0时,poll调用马上返回。
poll返回值含义与select同样。
epoll系列系统调用
内核事件表
epoll是Linux特有的I/O复用函数。它在实现和使用上与select、poll有非常大差异。首先,epoll使用一组函数来完毕任务。而不是单个函数。
其次,epoll把用户关心的文件描写叙述符上的时间放在内核里的一个时间表中。从而无需向select和poll那样每次调用都要反复传入文件描写叙述符集或事件集。但epoll须要使用一个额外的文件描写叙述符,来唯一标识内核中的这个时间表。这个文件描写叙述符使用例如以下epoll_create函数创建:
#include <sys/epoll.h>
int epoll_create(int size);
size參数给内核一个提示,告诉它时间表须要多大。该函数返回的文件描写叙述符将作用其它全部epoll系统调用的第一个參数,以指定要訪问的内核事件表。
以下的函数用来操作epoll的内核事件表:
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd,struct epoll_event *event);
fd參数是要操作的文件描写叙述符。op參数则制定操作类型,操作类型有例如以下3种:
EPOLL_CTL_ADD:往事件表中注冊fd上的事件
EPOLL_CTL_MOD:改动fd上的注冊事件
EPOLL_CTL_DEL:删除fd上的注冊事件
event參数指定时间,它是epoll_event结构指针类型。epoll_event的定义例如以下:
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
当中events成员描写叙述事件类型。data成员用于存储用户数据。其类型epoll_data的定义例如以下:
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
epoll_data_t是一个联合体,当中4个成员中使用最多的是fd,它指定事件所丛书的目标文件描写叙述符。
epoll_ctl成功时返回0,失败时返回-1并设置errno。
epoll_wait函数
epoll系列系统调用的主要接口是epoll_wait函数。它在一段超时时间内等待一组文件描写叙述符上的事件,其原型例如以下:
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event*events, int maxevents, int timeout);
该函数成功时返回就绪的文件描写叙述符的个数,失败是返回-1,并设置errno。
maxevents參数指定最多监听多少时间,必须大于0.
epoll_wait函数假设检測到事件。就将全部就绪的事件从内核事件表中拷贝到它的第二个參数events指向的数组中。这个数组仅仅用于输出epoll_wait检測到的就绪时间,而不像select和poll数组那样即用于传入用户注冊的时间,实用于输出内核检測到的就绪时间。这就极大的提高了应用程序索引就绪文件描写叙述符的效率。
以下的代码体现了这个区别:
/*怎样索引poll返回的就绪文件描写叙述符*/
int ret = poll(fds, MAX_EVENT_NUMBER, -1);
/*必须遍历全部注冊文件描写叙述符并找到当中的就绪着*/
for(int i=0;i<MAX_EVENT_NUMBER; ++i)
{
if(fds[i].revents & POLLIN)
{
int sockfd = fds[i].fd;
/*处理sockfd*/
}
}
/*怎样索引epoll返回的就绪文件描写叙述符*/
int ret =epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1);
/*遍历就绪的ret个文件描写叙述符*/
for( int i=0;i<ret; i++)
{
int socketfd = events[i].data.fd;
/*socket肯定就绪。直接处理*/
}
LT和ET模式
epoll对文件描写叙述符的操作有两种模式:LT模式(Levek Trigger,电平触发)和ET模式(E多个Trigger,边沿触发)。
LT模式是默认的工作模式,这样的模式下epoll相当于一个效率较高的poll。
当往epoll内核事件表中注冊一个文件描写叙述符上的EPOLLET事件时,epoll将以ET模式来操作该文件描写叙述符。ET模式是epoll的搞笑工作模式。
对于採用LT工作模式的文件描写叙述符。当epoll_wait检測到其上有时间发生并将此事件通知应用程序后,应用程序能够不马上处理该事件。这样。当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到该事件被处理。而对于採用ET工作模式的文件描写叙述符,当epoll_wait检測到其上有时间发生并将此时间通知应用程序后。应用程序必须马上处理该事件,由于兴许的epoll_wait调用将不再向用用程序通知这一事件。可见。ET在非常大程度上减少了同一个epoll事件被反复触发的次数,因此效率比LT模式高。
文章最后的程序清单1比較了两种模式:
当在clienttelnet传输“abcdefghijklmnopqrstuvwxyz”字符串时。输出例如以下
ET模式输出:
event trigger once
get 9 bytes of content: abcdefghi
get 9 bytes of content: jklmnopqr
get 9 bytes of content: stuvwxyz
get 1 bytes of content:
LT模式输出:
event trigger once
get 9 bytes of content: abcdefghi
event trigger once
get 9 bytes of content: jklmnopqr
event trigger once
get 9 bytes of content: stuvwxyz
event trigger once
get 1 bytes of content:
能够看到正如我们预期,ET模式下时间仅仅被触发一次,要比LT模式下少非常多。
EPOLLONESHOT事件
即使我们使用ET模式。一个socket上的某个事件还是可能被触发多次。这在并发程序中会引起一个问题。
比方一个县城在读取完某个socket上的数据后開始处理这些数据,二在数据的处理project中该socket上又有新数据可读。此时另外一个县城北唤醒来读取这些新的数据。
于是就出现了两个线程同一时候操作一个socket的局面。这当然不是我们期望的。
我们期望的是一个socket连接在任一时刻都仅仅被一个线程处理。这一点能够使用spoll的EPOLLONESHOT事件实现。
对于注冊了EPOLLONESHOT事件的文件描写叙述符,操作系统最多触发其上注冊的一个可读、可写或者异常事件,并且仅仅触发一次。除非我们使用epoll_ctl函数重置该文件描写叙述符上注冊的EPOLLONESHOT事件ain.zheyang。当一个线程在处理某个socket时。其它线程是不可能有机会操作该socket的。
但反过来思考,注冊了EPOLLONESHOT事件的socket一旦被某个线程处理完成,该线程就应该马上重置这个socket上的EPOLLONESHOT事件,以确保这个socket下一次可读时,其EPOLLIN事件能被触发,进而让其它工作线程有机会处理这个socket。
程序清单2展示了EPOLLONESHOT事件的使用。
三组I/O复用函数的比較
系统调用 |
select |
poll |
epoll |
事件集合 |
用户通过3个參数分别传入感兴趣的可读、可写及异常等事件。内核通过对这些參数在线改动来反馈当中的就绪事件。这使得用户每次调用select都要重置这3个參数 |
统一处理全部事件类型,因此仅仅须要一个事件集參数。用户通过pollfd.events传入感兴趣的事件,内核通过改动pollfd.revents反馈当中就绪的事件 |
内核通过一个时间表直接管理用户感兴趣的全部事件。因此每次调用epoll_wait时,无需重复传入用户感兴趣的时间。 epoll_wait系统调用的參数events仅用来反馈就绪的事件。 |
应用程序索引就绪文件描写叙述符的时间复杂度 |
O(N) |
O(N) |
O(1) |
最大支持文件描写叙述符数 |
一般有最大值限制 |
65535 |
65535 |
工作模式 |
LT |
LT |
支持ET高效模式 |
内核实现和工作效率 |
採用轮询方法来检測就绪事件,算法复杂度为O(N) |
採用轮询方式检測就绪事件,算法复杂度为O(N) |
採用回调方式来检測就绪事件。算法复杂度为O(1) |
聊天程序见程序(poll实现)见清单3
同一时候处理TCP和UDP服务的回射server程序(epoll程序)见清单4
程序清单1: #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #define MAX_EVENT_NUMBER 1024 #define BUFFER_SIZE 10 int setnonblocking( int fd ) { int old_option = fcntl( fd, F_GETFL ); int new_option = old_option | O_NONBLOCK; fcntl( fd, F_SETFL, new_option ); return old_option; } void addfd( int epollfd, int fd, bool enable_et ) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN; if( enable_et ) { event.events |= EPOLLET; } epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event ); setnonblocking( fd ); } void lt( epoll_event* events, int number, int epollfd, int listenfd ) { char buf[ BUFFER_SIZE ]; for ( int i = 0; i < number; i++ ) { int sockfd = events[i].data.fd; if ( sockfd == listenfd ) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); addfd( epollfd, connfd, false ); } else if ( events[i].events & EPOLLIN ) { printf( "event trigger once\n" ); memset( buf, ‘\0‘, BUFFER_SIZE ); int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 ); if( ret <= 0 ) { close( sockfd ); continue; } printf( "get %d bytes of content: %s\n", ret, buf ); } else { printf( "something else happened \n" ); } } } void et( epoll_event* events, int number, int epollfd, int listenfd ) { char buf[ BUFFER_SIZE ]; for ( int i = 0; i < number; i++ ) { int sockfd = events[i].data.fd; if ( sockfd == listenfd ) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); addfd( epollfd, connfd, true ); } else if ( events[i].events & EPOLLIN ) { printf( "event trigger once\n" ); while( 1 ) { memset( buf, ‘\0‘, BUFFER_SIZE ); int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 ); if( ret < 0 ) { if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) ) { printf( "read later\n" ); break; } close( sockfd ); break; } else if( ret == 0 ) { close( sockfd ); } else { printf( "get %d bytes of content: %s\n", ret, buf ); } } } else { printf( "something else happened \n" ); } } } int main( int argc, char* argv[] ) { if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); int ret = 0; struct sockaddr_in address; bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( listenfd >= 0 ); ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); assert( ret != -1 ); ret = listen( listenfd, 5 ); assert( ret != -1 ); epoll_event events[ MAX_EVENT_NUMBER ]; int epollfd = epoll_create( 5 ); assert( epollfd != -1 ); addfd( epollfd, listenfd, true ); while( 1 ) { int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 ); if ( ret < 0 ) { printf( "epoll failure\n" ); break; } lt( events, ret, epollfd, listenfd ); //et( events, ret, epollfd, listenfd ); } close( listenfd ); return 0; }
程序清单2 #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #define MAX_EVENT_NUMBER 1024 #define BUFFER_SIZE 1024 struct fds { int epollfd; int sockfd; }; int setnonblocking( int fd ) { int old_option = fcntl( fd, F_GETFL ); int new_option = old_option | O_NONBLOCK; fcntl( fd, F_SETFL, new_option ); return old_option; } void addfd( int epollfd, int fd, bool oneshot ) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; if( oneshot ) { event.events |= EPOLLONESHOT; } epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event ); setnonblocking( fd ); } void reset_oneshot( int epollfd, int fd ) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET | EPOLLONESHOT; epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event ); } void* worker( void* arg ) { int sockfd = ( (fds*)arg )->sockfd; int epollfd = ( (fds*)arg )->epollfd; printf( "start new thread to receive data on fd: %d\n", sockfd ); char buf[ BUFFER_SIZE ]; memset( buf, ‘\0‘, BUFFER_SIZE ); while( 1 ) { int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 ); if( ret == 0 ) { close( sockfd ); printf( "foreiner closed the connection\n" ); break; } else if( ret < 0 ) { if( errno == EAGAIN ) { reset_oneshot( epollfd, sockfd ); printf( "read later\n" ); break; } } else { printf( "get content: %s\n", buf ); sleep( 5 ); } } printf( "end thread receiving data on fd: %d\n", sockfd ); } int main( int argc, char* argv[] ) { if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); int ret = 0; struct sockaddr_in address; bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( listenfd >= 0 ); ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); assert( ret != -1 ); ret = listen( listenfd, 5 ); assert( ret != -1 ); epoll_event events[ MAX_EVENT_NUMBER ]; int epollfd = epoll_create( 5 ); assert( epollfd != -1 ); addfd( epollfd, listenfd, false ); while( 1 ) { int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 ); if ( ret < 0 ) { printf( "epoll failure\n" ); break; } for ( int i = 0; i < ret; i++ ) { int sockfd = events[i].data.fd; if ( sockfd == listenfd ) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); addfd( epollfd, connfd, true ); } else if ( events[i].events & EPOLLIN ) { pthread_t thread; fds fds_for_new_worker; fds_for_new_worker.epollfd = epollfd; fds_for_new_worker.sockfd = sockfd; pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker ); } else { printf( "something else happened \n" ); } } } close( listenfd ); return 0; }
程序清单3 客户端程序 #define _GNU_SOURCE 1 #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <poll.h> #include <fcntl.h> #define BUFFER_SIZE 64 int main( int argc, char* argv[] ) { if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); struct sockaddr_in server_address; bzero( &server_address, sizeof( server_address ) ); server_address.sin_family = AF_INET; inet_pton( AF_INET, ip, &server_address.sin_addr ); server_address.sin_port = htons( port ); int sockfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( sockfd >= 0 ); if ( connect( sockfd, ( struct sockaddr* )&server_address, sizeof( server_address ) ) < 0 ) { printf( "connection failed\n" ); close( sockfd ); return 1; } pollfd fds[2]; fds[0].fd = 0; fds[0].events = POLLIN; fds[0].revents = 0; fds[1].fd = sockfd; fds[1].events = POLLIN | POLLRDHUP; fds[1].revents = 0; char read_buf[BUFFER_SIZE]; int pipefd[2]; int ret = pipe( pipefd ); assert( ret != -1 ); while( 1 ) { ret = poll( fds, 2, -1 ); if( ret < 0 ) { printf( "poll failure\n" ); break; } if( fds[1].revents & POLLRDHUP ) { printf( "server close the connection\n" ); break; } else if( fds[1].revents & POLLIN ) { memset( read_buf, ‘\0‘, BUFFER_SIZE ); recv( fds[1].fd, read_buf, BUFFER_SIZE-1, 0 ); printf( "%s\n", read_buf ); } if( fds[0].revents & POLLIN ) { ret = splice( 0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE ); ret = splice( pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE ); } } close( sockfd ); return 0; } 服务器程序 #define _GNU_SOURCE 1 #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <poll.h> #define USER_LIMIT 5 #define BUFFER_SIZE 64 #define FD_LIMIT 65535 struct client_data { sockaddr_in address; char* write_buf; char buf[ BUFFER_SIZE ]; }; int setnonblocking( int fd ) { int old_option = fcntl( fd, F_GETFL ); int new_option = old_option | O_NONBLOCK; fcntl( fd, F_SETFL, new_option ); return old_option; } int main( int argc, char* argv[] ) { if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); int ret = 0; struct sockaddr_in address; bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( listenfd >= 0 ); ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); assert( ret != -1 ); ret = listen( listenfd, 5 ); assert( ret != -1 ); client_data* users = new client_data[FD_LIMIT]; pollfd fds[USER_LIMIT+1]; int user_counter = 0; for( int i = 1; i <= USER_LIMIT; ++i ) { fds[i].fd = -1; fds[i].events = 0; } fds[0].fd = listenfd; fds[0].events = POLLIN | POLLERR; fds[0].revents = 0; while( 1 ) { ret = poll( fds, user_counter+1, -1 ); if ( ret < 0 ) { printf( "poll failure\n" ); break; } for( int i = 0; i < user_counter+1; ++i ) { if( ( fds[i].fd == listenfd ) && ( fds[i].revents & POLLIN ) ) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); if ( connfd < 0 ) { printf( "errno is: %d\n", errno ); continue; } if( user_counter >= USER_LIMIT ) { const char* info = "too many users\n"; printf( "%s", info ); send( connfd, info, strlen( info ), 0 ); close( connfd ); continue; } user_counter++; users[connfd].address = client_address; setnonblocking( connfd ); fds[user_counter].fd = connfd; fds[user_counter].events = POLLIN | POLLRDHUP | POLLERR; fds[user_counter].revents = 0; printf( "comes a new user, now have %d users\n", user_counter ); } else if( fds[i].revents & POLLERR ) { printf( "get an error from %d\n", fds[i].fd ); char errors[ 100 ]; memset( errors, ‘\0‘, 100 ); socklen_t length = sizeof( errors ); if( getsockopt( fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length ) < 0 ) { printf( "get socket option failed\n" ); } continue; } else if( fds[i].revents & POLLRDHUP ) { users[fds[i].fd] = users[fds[user_counter].fd]; close( fds[i].fd ); fds[i] = fds[user_counter]; i--; user_counter--; printf( "a client left\n" ); } else if( fds[i].revents & POLLIN ) { int connfd = fds[i].fd; memset( users[connfd].buf, ‘\0‘, BUFFER_SIZE ); ret = recv( connfd, users[connfd].buf, BUFFER_SIZE-1, 0 ); printf( "get %d bytes of client data %s from %d\n", ret, users[connfd].buf, connfd ); if( ret < 0 ) { if( errno != EAGAIN ) { close( connfd ); users[fds[i].fd] = users[fds[user_counter].fd]; fds[i] = fds[user_counter]; i--; user_counter--; } } else if( ret == 0 ) { printf( "code should not come to here\n" ); } else { for( int j = 1; j <= user_counter; ++j ) { if( fds[j].fd == connfd ) { continue; } fds[j].events |= ~POLLIN; fds[j].events |= POLLOUT; users[fds[j].fd].write_buf = users[connfd].buf; } } } else if( fds[i].revents & POLLOUT ) { int connfd = fds[i].fd; if( ! users[connfd].write_buf ) { continue; } ret = send( connfd, users[connfd].write_buf, strlen( users[connfd].write_buf ), 0 ); users[connfd].write_buf = NULL; fds[i].events |= ~POLLOUT; fds[i].events |= POLLIN; } } } delete [] users; close( listenfd ); return 0; }
程序清单4 回射服务器程序 #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #define MAX_EVENT_NUMBER 1024 #define TCP_BUFFER_SIZE 512 #define UDP_BUFFER_SIZE 1024 int setnonblocking( int fd ) { int old_option = fcntl( fd, F_GETFL ); int new_option = old_option | O_NONBLOCK; fcntl( fd, F_SETFL, new_option ); return old_option; } void addfd( int epollfd, int fd ) { epoll_event event; event.data.fd = fd; //event.events = EPOLLIN | EPOLLET; event.events = EPOLLIN; epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event ); setnonblocking( fd ); } int main( int argc, char* argv[] ) { if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); int ret = 0; struct sockaddr_in address; bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( listenfd >= 0 ); ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); assert( ret != -1 ); ret = listen( listenfd, 5 ); assert( ret != -1 ); bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); int udpfd = socket( PF_INET, SOCK_DGRAM, 0 ); assert( udpfd >= 0 ); ret = bind( udpfd, ( struct sockaddr* )&address, sizeof( address ) ); assert( ret != -1 ); epoll_event events[ MAX_EVENT_NUMBER ]; int epollfd = epoll_create( 5 ); assert( epollfd != -1 ); addfd( epollfd, listenfd ); addfd( epollfd, udpfd ); while( 1 ) { int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 ); if ( number < 0 ) { printf( "epoll failure\n" ); break; } for ( int i = 0; i < number; i++ ) { int sockfd = events[i].data.fd; if ( sockfd == listenfd ) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); addfd( epollfd, connfd ); } else if ( sockfd == udpfd ) { char buf[ UDP_BUFFER_SIZE ]; memset( buf, ‘\0‘, UDP_BUFFER_SIZE ); struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); ret = recvfrom( udpfd, buf, UDP_BUFFER_SIZE-1, 0, ( struct sockaddr* )&client_address, &client_addrlength ); if( ret > 0 ) { sendto( udpfd, buf, UDP_BUFFER_SIZE-1, 0, ( struct sockaddr* )&client_address, client_addrlength ); } } else if ( events[i].events & EPOLLIN ) { char buf[ TCP_BUFFER_SIZE ]; while( 1 ) { memset( buf, ‘\0‘, TCP_BUFFER_SIZE ); ret = recv( sockfd, buf, TCP_BUFFER_SIZE-1, 0 ); if( ret < 0 ) { if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) ) { break; } close( sockfd ); break; } else if( ret == 0 ) { close( sockfd ); } else { send( sockfd, buf, ret, 0 ); } } } else { printf( "something else happened \n" ); } } } close( listenfd ); return 0; }
以上是关于Linux高性能server编程——I/O复用的主要内容,如果未能解决你的问题,请参考以下文章