epoll实现socket通信

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了epoll实现socket通信相关的知识,希望对你有一定的参考价值。

 epoll是Linux特有的I/O复用函数,它在实现和使用上与select和poll有很大差异。epoll使用一组函数来完成任务,而不是单个函数。epoll把用户关心的文件描述符上的事件放在内核的一个事件表中,无需像select和poll那样每次调用都要重复传入文件描述符集或事件集,但epoll需要一个额外的文件描述符,来唯一标示内核中的这个事件表,这个文件描述符使用epoll_create函数来创建。

  epoll是一种高效的管理socket的模型,相对于select和poll来说具有更高的效率和易用性。epoll的性能不会随socket数量的增加而下降。

下面我们来说说epoll的使用:

  epoll所使用的数据结构如下:

技术分享


   结构体epoll_event被用于注册感兴趣的事件和回传所发生待处理的事件,epoll_data联合体用来保存触发事件的某个文件描述符相关的数据。例如一个client连接到服务器,服务器通过调用accept函数可以得到这个client对应的socket文件描述符,可以把这个文件描述符赋给epoll_data的fd字段,以便以后的读写操作在这个文件描述符上进行。epoll_data_t是一个联合体,其中4个成员最多用的就是fd,它指定事件所从属的目标文件描述符,ptr成员可用来指定与fd相关的用户数据。但由于epoll_data_t是一个联合体,我们不能同时使用其ptr成员和fd成员,因此我们要将文件描述符和用户数据关联起来,以实现快熟的数据访问,只能使用其他手段,我们在下面的程序中自定义了个结构体,里面有我们所关心的fd和保存用户数据的buf。

  events字段是表示感兴趣的事件,被触发的事件的可能取值为:

      EPOLLIN:表示对应的文件描述符可以读;

      EPOLLOUT:表示对应的文件描述符可以写;

      EPOLLPRI:表示对应的文件描述符有紧急的数据可读;

      EPOLLERR:表示对应的文件描述符发生错误;

      EPOLLHUP:表示对应的文件描述符被挂断;

      EPOLLET:表示将EPOLL设置为边缘触发模式;

      EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。

所用到的函数有如下三个:

  1.epoll_create函数:

     原型:int epoll_create(int size)

     该函数生成一个epoll专用的文件描述符,size参数指定生成描述符的最大范围。size参数现在并不起作用,使用红黑树来管理所有的文件描述符,该函数返回的文件描述符将用作其他所有的epoll系统调用的第一个参数,

  2.epoll_ctl函数:

     原型:int epoll_ctl(int epfd,int op,int fd,struct epoll_event *event)

     该函数用于控制某个文件描述符上的事件,可以注册事件,修改事件,删除事件。

     参数:epfd:由epoll_create生成的epoll专用的文件描述符

          op:要进行的操作,可能的取值有:

              EPOLL_CTL_ADD 注册

              EPOLL_CTL_MOD 修改

              EPOLL_CTL_DEL 删除

          fd:关联的文件描述符

          event:指向epoll_event的指针

    调用成功返回0,失败返回-1;

 3.epoll_wait函数:

      原型:int epoll_wait(int epfd,struct epoll_event *events,int maxevents,int timeout)

      该函数用于轮询I/O事件的发生,调用成功时返回就绪的文件描述符的个数,失败时返回-1,并设置errno。该函数如果检测到事件,就将所有就绪的事件从内核表中(由epfd参数指定)复制到它的第二个参数events指向的数组中,这个数组只用于输出epoll_wait检测到的就绪事件。

     参数:

        epfd:由epoll_create生成的epoll专用的文件描述符

        events:用于回传待处理的数组

        maxevent:每次能处理的事件数

        timeout:与poll接口的timeout参数相同,是超时时间,0会立即返回,-1是永久阻塞。

     如果该函数调用成功,返回对应I/O上已准备好的文件描述符数目,如果返回0表示已超时。

 

接下来我们来说说epoll的工作原理:

  epoll同样只告诉那些已就绪的文件描述符,而且当我们调用epoll_wait获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,我们只需去epoll指定的数组中依次取得相应数量的文件描述符即可,这里使用内存映射技术,节省了系统调用时的开销。两一个本质的改进在与epoll采用基于事件的就绪通知方式,在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl来注册一个文件描述符,一旦某个文件描述符就绪时,内核会采用回调机制,循序激活这个文件描述符,当进程调用epoll_wait时便得到通知。

 

LT模式和ET模式:

  epoll对于文件描述符的操作有两种模式,LT(水平触发)和ET(边缘触发)。LT模式是默认的工作模式,这种模式下epoll相当于一个效率较高的poll。当往epoll内核事件表中注册一个文件描述符上的EPOLLET事件时,epoll将以ET模式来操作该文件描述符。ET模式是epoll的高效工作模式。

  对于采用LT工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序告知该事件,直到该事件被处理。而采用ET模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件,。这样,ET模式在很大程度上降低了同一个epoll事件被重复触发的次数,因此效率比LT模式要高。

 

我们在下面的编程中还用到了一个fcntl函数,该函数原型如下:

  int fcntl(int fd,int cmd,...)

  该函数可以执行各种描述符控制操作,它提供的与网络编程相关的特性如下:

    1.非阻塞式I/O。通过使用F_SETFL命令设置O_NONBLOCK文件状态标志,我们可以把一个套接字设置为非阻塞。

    2.信号驱动式I/O。通过使用F_SETFL命令设置O_ASYNC文件状态标志,我们可以把一个套接字设置成一旦其状态发生变化,内核就产生一个SIGIO信号。

    3.F_SETOWN命令允许我们指定用于接收SIGIO和SIGURG信号的套接字属主。其中SIGIO信号时套接字被设置为信号驱动式I/O型后产生的,SIGURG信号时在新的带外数据到达套接字时产生的。F_GETOWN返回套接字的当前属主。


接下来我们看看基于epoll的socket编程代码:(基于LT模式下的阻塞模式)

客户端给服务端发送消息。服务端回显给客户端:

  server端:

#include <stdio.h>
  #include <stdlib.h>
  #include <unistd.h>
  #include <assert.h>
  #include <string.h>
  #include <arpa/inet.h>
  #include <netinet/in.h>
  #include <sys/epoll.h>
  #include <sys/types.h>
  #include <sys/socket.h>
  #include<errno.h>
  #include<fcntl.h>
  #define _BACKLOG_ 5
  #define _BUF_SIZE_ 10240
  #define _MAX_ 64
  
  typedef struct _data_buf
  {
      int fd;
      char buf[_BUF_SIZE_];
  }data_buf_t,*data_buf_p;
   static void usage(const char* proc)
  {
      printf("usage:%s[ip][port]\n",proc);
  }
  
  static int start(int port,char *ip)
  {
      assert(ip);
      int sock=socket(AF_INET,SOCK_STREAM,0);
      if(sock<0)
      {
          perror("socket");
          exit(1);
      }
  
      struct sockaddr_in local;
      local.sin_port=htons(port);
      local.sin_family=AF_INET;
      local.sin_addr.s_addr=inet_addr(ip);
  
      int opt=1;  //设置为接口复用
      setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
  
      if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
      {
          perror("bind");
          exit(2);
      }
  
      if(listen(sock,_BACKLOG_)<0)
      {
          perror("listen");
          exit(3);
      }
      return sock;
  }
 
  static int epoll_server(int listen_sock)
  {
      int epoll_fd=epoll_create(256);//生成一个专用的epoll文件描述符
      if(epoll_fd<0)
      {
          perror("epoll_create");
          exit(1);
      }

     struct epoll_event ev;//用于注册事件
     struct epoll_event ret_ev[_MAX_];//数组用于回传要处理的事件
     int ret_num=_MAX_;
     int read_num=-1;
     ev.events=EPOLLIN;
     ev.data.fd=listen_sock;
     if(epoll_ctl(epoll_fd,EPOLL_CTL_ADD,listen_sock,&ev)<0)//用于控制某个文件描述符上的事件(注册,修改,删除)
     {
         perror("epoll_ctl");
         return -2;
     }
 
     int done=0;
     int i=0;
     int timeout=5000;
     struct sockaddr_in client;
     socklen_t len=sizeof(client);
     while(!done)
     {
         switch(read_num=epoll_wait(epoll_fd,ret_ev,ret_num,timeout))//用于轮寻I/O事件的发生
         {
             case0:                                                                          
                 printf("time out\n");
                 break;
             case -1:
                 perror("epoll");
                 exit(2);
             default:
                 {
                     for(i=0;i<read_num;++i)
                     {
                         if(ret_ev[i].data.fd==listen_sock&&(ret_ev[i].events&EPOLLIN))
                         {
 
                             int fd=ret_ev[i].data.fd;
                             int new_sock=accept(fd,(struct sockaddr*)&client,&len);
                             if(new_sock<0)
                             {
                                 perror("accept");
                                 continue;
                             }
 
                             ev.events=EPOLLIN;
                             ev.data.fd=new_sock;
                             epoll_ctl(epoll_fd,EPOLL_CTL_ADD,new_sock,&ev);
                             printf("get a new client...\n");
                         }
                         else  //normal sock
                         {
                             if(ret_ev[i].events&EPOLLIN)
                             {
                                 int fd=ret_ev[i].data.fd;
                                 data_buf_p mem=(data_buf_p)malloc(sizeof(data_buf_t));
                                 if(!mem)
                 perror("malloc");                                                                      
                                     continue;
                                 }
                                 mem->fd=fd;
                                 memset(mem->buf,‘\0‘,sizeof(mem->buf));
                                 ssize_t _s=read(mem->fd,mem->buf,sizeof(mem -> buf)-1);
                                 if(_s>0)
                                 {
                                     mem->buf[_s-1]=‘\0‘;
                                     printf("client: %s\n",mem->buf);
                                     ev.events=EPOLLOUT;
                                     ev.data.ptr=mem;
                                     epoll_ctl(epoll_fd,EPOLL_CTL_MOD,fd,&ev);
                                 }
                                 else if(_s==0)
                                 {
                                     printf("client close...\n");
                                     epoll_ctl(epoll_fd,EPOLL_CTL_DEL,fd,NULL);
                                     close(fd);
                                     free(mem);
                                 }
                                 else
                                 {
                                     continue;
                                 }
                             }
                             else if(ret_ev[i].events&EPOLLOUT)  //写事件准备就绪
                             {
                                     data_buf_p mem=(data_buf_p)ret_ev[i].data.ptr;
                                     int fd=mem->fd;
                                     char *buf=mem->buf;
                                     write(fd,buf,strlen(buf));
                                     ev.events=EPOLLIN;    //写完,下次关心读事件
                                     ev.data.fd=fd;
                                     epoll_ctl(epoll_fd,EPOLL_CTL_MOD,fd,&ev);
                             }
                             else{
                                 //....
                             }
                         }
                     }
                 }
                 break;
         }
     }
 
 }
 
 int main(int argc,char* argv[])
 {
     if(argc!=3)
     {
         usage(argv[0]);
         return 1;
     }
 
     int port=atoi(argv[2]);
     char *ip=argv[1];
 
     int listen_sock=start(port,ip);
     epoll_server(listen_sock);
     close(listen_sock);
     return 0;
 }

client端:

   

#include <stdio.h>
   #include <stdlib.h>
   #include <assert.h>
   #include <poll.h>
   #include <string.h>
   #include <unistd.h>
   #include <netinet/in.h>
   #include <sys/types.h>
   #include <sys/socket.h>
  #include <arpa/inet.h>
  
  static void usage(const char* arg)
  {
      printf("usage:%s [ip][port]",arg);
  }
  
  int main(int argc,char *argv[])
  {
      if(argc!=3)
      {
          usage(argv[0]);
          exit(1);
      }
  
      int port=atoi(argv[2]);
      char *ip=argv[1];
  
      int sock=socket(AF_INET,SOCK_STREAM,0);
      if(sock<0)
      {
          perror("socket");                                                                                                                                           
          exit(2);
      }
  
      struct sockaddr_in remote;
      remote.sin_family=AF_INET;
                      remote.sin_port=htons(port);
      remote.sin_addr.s_addr=inet_addr(ip);
  
      int ret=connect(sock,(struct sockaddr*)&remote,sizeof(remote));                                                                                                 
  
      char buf[1024];
      while(1)
      {
          printf("please enter: ");
          fflush(stdout);
          ssize_t _s=read(0,buf,sizeof(buf)-1);
          buf[_s]=‘\0‘;
          write(sock,buf,sizeof(buf)-1);
          memset(buf,‘\0‘,sizeof(buf));
          read(sock,buf,sizeof(buf)-1);
          printf("echo:%s\n",buf);
      }
      return 0;
  }

 运行结果:

技术分享             

我们可以看到,客户端发给服务端的数据,被服务端收到后,回显给客户端。


接下来我们把程序改为ET模式非阻塞模式:

主要改的地方有:

       1.因为ET模式只通知一次,所以我们在读取数据的时候必须一次读完,我们写的read_data函数就是实现这个功能的;

       2.把所有的描述符都改为非阻塞模式,调用我们的set_no_block函数;

       3.注册事件的时候,要与上EPOLLET;

具体如下:

server端:


  

 #include <stdio.h>
  #include <stdlib.h>
  #include <unistd.h>
  #include <assert.h>
  #include <string.h>
  #include <arpa/inet.h>
  #include <netinet/in.h>
  #include <sys/epoll.h>
  #include <sys/types.h>
  #include <sys/socket.h>
  #include<errno.h>
  #include<fcntl.h>
  #define _BACKLOG_ 5
  #define _BUF_SIZE_ 10240
  #define _MAX_ 64
  
  typedef struct _data_buf
  {
      int fd;
      char buf[_BUF_SIZE_];
  }data_buf_t,*data_buf_p;
   static void usage(const char* proc)
  {
      printf("usage:%s[ip][port]\n",proc);
  }
  
  static int set_no_block(int fd) //用来设置非阻塞
  {
      int old_fl=fcntl(fd,F_GETFL);
      if(old_fl<0)
      {
          perror("perror");
          return -1;
      }
      if(fcntl(fd,F_SETFL,old_fl|O_NONBLOCK))
      {
          perror("fcntl");
          return -1;
      }
  
      return 0;
  }
     
  int read_data(int fd,char* buf,int size)//ET模式下读取数据,必须一次读完
  {
      assert(buf);                                                                                                                                                    
      int index=0;
      ssize_t _s=-1;
      while((_s=read(fd,buf+index,size-index))<size)
      {
          if(errno==EAGAIN)
          {
              break;
          }
         index += _s;
      }
      return index;
  }
  
  static int start(int port,char *ip)
  {
      assert(ip);
      int sock=socket(AF_INET,SOCK_STREAM,0);
      if(sock<0)
      {
          perror("socket");
          exit(1);
      }

      struct sockaddr_in local;
      local.sin_port=htons(port);
      local.sin_family=AF_INET;
      local.sin_addr.s_addr=inet_addr(ip);
  
      int opt=1;  //设置为接口复用
      setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
  
      if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
      {
          perror("bind");
          exit(2);
      }
  
      if(listen(sock,_BACKLOG_)<0)
      {
          perror("listen");
          exit(3);
      }
      return sock;
  }

  static int epoll_server(int listen_sock)
  {
      int epoll_fd=epoll_create(256);//生成一个专用的epoll文件描述符
      if(epoll_fd<0)
      {
          perror("epoll_create");
          exit(1);
      }
     set_no_block(listen_sock);//设置监听套接字为非阻塞
     struct epoll_event ev;//用于注册事件
     struct epoll_event ret_ev[_MAX_];//数组用于回传要处理的事件
     int ret_num=_MAX_;
     int read_num=-1;
     ev.events=EPOLLIN|EPOLLET;
     ev.data.fd=listen_sock;
     if(epoll_ctl(epoll_fd,EPOLL_CTL_ADD,listen_sock,&ev)<0)//用于控制某个文件描述符上的事件(注册,修改,删除)
     {
         perror("epoll_ctl");
         return -2;
     }

     int done=0;
     int i=0;
     int timeout=5000;
     struct sockaddr_in client;
     socklen_t len=sizeof(client);
     while(!done)
     {
         switch(read_num=epoll_wait(epoll_fd,ret_ev,ret_num,timeout))//用于轮寻I/O事件的发生
         {
             case0:                                                                          
                 printf("time out\n");
                 break;
             case -1:
                 perror("epoll");
                 exit(2);
             default:
                 {
                     for(i=0;i<read_num;++i)
                     {
                         if(ret_ev[i].data.fd==listen_sock&&(ret_ev[i].events&EPOLLIN))
                         {

                             int fd=ret_ev[i].data.fd;
                             int new_sock=accept(fd,(struct sockaddr*)&client,&len);
                             if(new_sock<0)
                             {
                                 perror("accept");
                                 continue;
                             }
                             set_no_block(new_sock);//设置套接字为非阻塞                            
                             ev.events=EPOLLIN|EPOLLET;
                             ev.data.fd=new_sock;
                             epoll_ctl(epoll_fd,EPOLL_CTL_ADD,new_sock,&ev);
                             printf("get a new client...\n");
                         }
                         else  //normal sock
                         {
                             if(ret_ev[i].events&EPOLLIN)
                             {
                                 int fd=ret_ev[i].data.fd;
                                 data_buf_p mem=(data_buf_p)malloc(sizeof(data_buf_t));
                                 if(!mem)
                                 {  
                                     perror("malloc");                                                                      
                                     continue;
                                 }
                                 mem->fd=fd;
                                 memset(mem->buf,‘\0‘,sizeof(mem->buf));
                                 ssize_t _s=read_data(mem->fd,mem->buf,sizeof(mem -> buf)-1);//一次读完
                                 if(_s>0)
                                 {
                                     mem->buf[_s-1]=‘\0‘;
                                     printf("client: %s\n",mem->buf);
                                     ev.events=EPOLLOUT|EPOLLET;
                                     ev.data.ptr=mem;
                                     epoll_ctl(epoll_fd,EPOLL_CTL_MOD,fd,&ev);
                                 }
                                 else if(_s==0)
                                 {
                                     printf("client close...\n");
                                     epoll_ctl(epoll_fd,EPOLL_CTL_DEL,fd,NULL);
                                     close(fd);
                                     free(mem);
                                 }
                                 else
                                 {
                                     continue;
                                 }
                             }
                             else if(ret_ev[i].events&EPOLLOUT)  //写事件准备就绪
                             {
                                     data_buf_p mem=(data_buf_p)ret_ev[i].data.ptr;
                                     char* msg="http/1.0 200 ok\r\n\r\nhello bit\r\n";
                               &

以上是关于epoll实现socket通信的主要内容,如果未能解决你的问题,请参考以下文章

c++ 求助socket多线程网络通信怎么实现并发

linux网络编程epoll内核实现代码分析

linux网络编程 - epoll内核实现代码分析

Android Framework实战开发-跨进程通信之 epoll详细讲解

Android Framework实战开发-跨进程通信之 epoll详细讲解

select,poll,epoll的区别以及使用方法