LINUX网络编程 IO 复用
Posted itdef
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了LINUX网络编程 IO 复用相关的知识,希望对你有一定的参考价值。
参考《linux高性能服务器编程》
LINUX下处理多个连接时候,仅仅使用多线程和原始socket函数,效率十分低下
于是就出现了selelct poll epoll等IO复用函数。
这里讨论性能最优的epoll IO复用
用户将需要关注的socket连接使用IO复用函数放进一个事件表中,每当事件表中有一个或者多个SOCKET连接出现读写请求时候,则进行处理
事件表使用一个额外的文件描述符来标识。文件描述符使用 epoll_create函数创建
#inlclude <sys/epoll.h>
int epoll_create(int size); size参数目前不起作用
操作事件表使用epoll_ctl函数进行控制
操作类型有以下3中
EPOLL_CTL_ADD EPOLL_CTL_MOD EPOLL_CTL_DEL
在一段超时时间内等待事件表上的事件 使用epoll_wait();
函数范例如下:
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #define MAX_EVENT_NUMBER 1024 #define BUFFER_SIZE 10 int setnonblocking( int fd ) { int old_option = fcntl( fd, F_GETFL ); int new_option = old_option | O_NONBLOCK; fcntl( fd, F_SETFL, new_option ); return old_option; } void addfd( int epollfd, int fd, bool enable_et ) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN; if( enable_et ) { event.events |= EPOLLET; } epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event ); setnonblocking( fd ); } void et( epoll_event* events, int number, int epollfd, int listenfd ) { char buf[ BUFFER_SIZE ]; for ( int i = 0; i < number; i++ ) { int sockfd = events[i].data.fd; if ( sockfd == listenfd ) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength ); addfd( epollfd, connfd, true ); } else if ( events[i].events & EPOLLIN ) { printf( "event trigger once\n" ); while( 1 ) { memset( buf, ‘\0‘, BUFFER_SIZE ); int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 ); if( ret < 0 ) { if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) ) { printf( "read later\n" ); break; } close( sockfd ); break; } else if( ret == 0 ) { close( sockfd ); } else { printf( "get %d bytes of content: %s\n", ret, buf ); } } } else { printf( "something else happened \n" ); } } } int main( int argc, char* argv[] ) { if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); int ret = 0; struct sockaddr_in address; bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); int listenfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( listenfd >= 0 ); ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) ); assert( ret != -1 ); ret = listen( listenfd, 5 ); assert( ret != -1 ); epoll_event events[ MAX_EVENT_NUMBER ]; int epollfd = epoll_create( 5 ); assert( epollfd != -1 ); addfd( epollfd, listenfd, true ); while( 1 ) { int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 ); if ( ret < 0 ) { printf( "epoll failure\n" ); break; } et( events, ret, epollfd, listenfd ); } close( listenfd ); return 0; }
在尝试一个多线程代码;
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #define MAX_EVENT_NUMBER 1024 #define BUFFER_SIZE 10 struct fds { int epollfd; int sockfd; }; void* worker( void* arg ) { int sockfd = ( (fds*)arg )->sockfd; int epollfd = ( (fds*)arg )->epollfd; printf( "start new thread to receive data on fd: %d\n", sockfd ); char buf[ BUFFER_SIZE ]; memset( buf, ‘\0‘, BUFFER_SIZE ); while( 1 ) { int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 ); if( ret == 0 ) { close( sockfd ); printf( "foreiner closed the connection\n" ); break; } else if( ret < 0 ) { if( errno == EAGAIN ) { // reset_oneshot( epollfd, sockfd ); printf( "read later\n" ); break; } } else { printf( "get content: %s\n", buf ); sleep( 5 ); } } printf( "end thread receiving data on fd: %d\n", sockfd ); } int setnonblocking(int fd) { int old_option = fcntl(fd,F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd,F_SETFL,new_option); return old_option; } void addfd(int epollfd,int fd,bool enable_et) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN; if(enable_et){ event.events |= EPOLLET; } epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event); setnonblocking(fd); } void et(epoll_event* events,int number,int epollfd,int listenfd){ char buf[BUFFER_SIZE]; for(int i = 0; i < number;i++){ int sockfd = events[i].data.fd; if(sockfd == listenfd){ struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength); addfd(epollfd,connfd,true); }else if(events[i].events & EPOLLIN){ /* printf("event trigger once\n"); while(1){ memset(buf,‘\0‘,BUFFER_SIZE); int ret = recv(sockfd,buf,BUFFER_SIZE-1,0); if(ret < 0){ if((errno == EAGAIN) || (errno == EWOULDBLOCK)){ printf("read later\n"); break; } close(sockfd); break; }else if(ret == 0){ close(sockfd); }else{ printf("get %d bytes of content: %s \n",ret,buf); } }*/ pthread_t thread; fds fds_for_new_worker; fds_for_new_worker.epollfd = epollfd; fds_for_new_worker.sockfd = sockfd; pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker ); }else{ printf("something else happened \n"); } } } int main() { int ret = 0; struct sockaddr_in address; bzero(&address,sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET,"127.0.0.1",&address.sin_addr); address.sin_port = htons(1134); int listenfd = socket(AF_INET,SOCK_STREAM,0); assert(listenfd >= 0); ret = bind(listenfd,(struct sockaddr*)&address,sizeof(address)); assert(ret != -1); ret = listen(listenfd,5); assert(ret != -1); epoll_event events[MAX_EVENT_NUMBER]; int epollfd = epoll_create(5); assert(epollfd != -1); addfd(epollfd,listenfd,true); while(1){ int ret = epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1); if(ret < 0){ printf("epoll failure\n"); break; } et(events,ret,epollfd,listenfd); } close(listenfd); return 0; }
多线程版本中 如果我们 连续多次输入会出现一个问题
如果我们连续多次在telnet的客户端输入
服务端会开启多个线程 接受这个socket的输入
显示如下:
fd: 5, sockfdget content: dfdsf
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd
如果多个线程同时操作一个socket 可能出现各种意外情况
那么我们就需要设置socket的EPOLLONESHOT标志
对于连续操作,操作系统最多触发该socket上一次读写异常事件。
代码如下:
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #define MAX_EVENT_NUMBER 1024 #define BUFFER_SIZE 10 struct fds { int epollfd; int sockfd; }; void reset_oneshot( int epollfd, int fd ) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET | EPOLLONESHOT; epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event ); } void* worker( void* arg ) { int sockfd = ( (fds*)arg )->sockfd; int epollfd = ( (fds*)arg )->epollfd; printf( "start new thread to receive data on fd: %d\n", sockfd ); char buf[ BUFFER_SIZE ]; memset( buf, ‘\0‘, BUFFER_SIZE ); while( 1 ) { int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 ); if( ret == 0 ) { close( sockfd ); printf( "foreiner closed the connection\n" ); break; } else if( ret < 0 ) { if( errno == EAGAIN ) { reset_oneshot( epollfd, sockfd ); printf( "read later\n" ); break; } } else { printf( "get content: %s\n", buf ); sleep( 5 ); } } printf( "end thread receiving data on fd: %d\n", sockfd ); } int setnonblocking(int fd) { int old_option = fcntl(fd,F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd,F_SETFL,new_option); return old_option; } void addfd(int epollfd,int fd,bool setOneShot) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN; event.events |= EPOLLET; if(setOneShot) event.events |= EPOLLONESHOT; epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event); setnonblocking(fd); } void et(epoll_event* events,int number,int epollfd,int listenfd){ char buf[BUFFER_SIZE]; for(int i = 0; i < number;i++){ int sockfd = events[i].data.fd; if(sockfd == listenfd){ struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength); addfd(epollfd,connfd,true); }else if(events[i].events & EPOLLIN){ /* printf("event trigger once\n"); while(1){ memset(buf,‘\0‘,BUFFER_SIZE); int ret = recv(sockfd,buf,BUFFER_SIZE-1,0); if(ret < 0){ if((errno == EAGAIN) || (errno == EWOULDBLOCK)){ printf("read later\n"); break; } close(sockfd); break; }else if(ret == 0){ close(sockfd); }else{ printf("get %d bytes of content: %s \n",ret,buf); } }*/ pthread_t thread; fds fds_for_new_worker; fds_for_new_worker.epollfd = epollfd; fds_for_new_worker.sockfd = sockfd; pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker ); }else{ printf("something else happened \n"); } } } int main() { int ret = 0; struct sockaddr_in address; bzero(&address,sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET,"127.0.0.1",&address.sin_addr); address.sin_port = htons(1134); int listenfd = socket(AF_INET,SOCK_STREAM,0); assert(listenfd >= 0); ret = bind(listenfd,(struct sockaddr*)&address,sizeof(address)); assert(ret != -1); ret = listen(listenfd,5); assert(ret != -1); epoll_event events[MAX_EVENT_NUMBER]; int epollfd = epoll_create(5); assert(epollfd != -1); addfd(epollfd,listenfd,false); while(1){ int ret = epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1); if(ret < 0){ printf("epoll failure\n"); break; } et(events,ret,epollfd,listenfd); } close(listenfd); return 0; }
输入
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is ‘^]‘.
1
2
3
4
5
sdfufghbskdjhkbaskjhbkjlsadvfblkajwsvdfbljkashdbkfjhasdfhasjkhD
输出
start new thread to receive data on fd: 5
get content: 1
get content: 2
3
4
get content: 5
sdfufg
get content: hbskdjhkb
get content: askjhbkjl
get content: sadvfblka
get content: jwsvdfblj
get content: kashdbkfj
get content: hasdfhasj
get content: khD
hasj
read later
end thread receiving data on fd: 5
以上是关于LINUX网络编程 IO 复用的主要内容,如果未能解决你的问题,请参考以下文章