网络编程I/O复用函数

Posted SalvationN

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了网络编程I/O复用函数相关的知识,希望对你有一定的参考价值。

I/O复用使得程序能够同时监听和处理多个文件描述符,提高程序的效率。主流的系统调用主要有select,poll和epoll。

select

系统调用原型

int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* expectfds, struct timeval* timeout);
其中,nfds为select监听的最大文件描述符个数+1,fd_set是一个数据结构,其本质上是一个整形数组,数组中的每一个位都标记一个文件描述符,而fd_set的容量是由内核决定的,即select能同时处理的文件数量是有限的。
fd_set操作的一些宏定义:

  1. FD_ZERO(fd_set *fdset) —— 清除fdset所有的位
  2. FD_SET(int fd, fd_set *fdset) —— 设置fd位
  3. FD_CLR(int fd, fd_set *fdset) —— 清除fd位
  4. int FD_ISET(int fd, fd_set *fdset) —— 测试fd位是否被设置

使用select实现简易TCP服务器

代码略长,这里只提供伪代码,并分析调试过程中碰到的一些问题和对socket编程的理解,做一些笔记。在实际代码中,我对几个模块做了封装,在这里只分析代码逻辑和流程,不纠结面向对象的问题。
server侧伪代码:

// 第一部分:创建监听socket 
listenfd = socket(...);
bind(...);
listen(...);

// 第二部分:监控文件描述符集的初始化

// client_fdset用于存放已连接的socket,tmp_fdset可以看做是client_fdset的拷贝,作为select的输入
fd_set client_fdset, tmp_fdset;     
FD_ZERO();
// 将listenfd加入到fdset中,方便监听是否有连接进入
FD_SET(listenfd, &...);
// 设置select的timeval
struct timeval tv;
......
// 活动的sockfd集合,可以用数组,也可以用vector,这里...代表的自己创建的socket类,包含bind等操作
vector<...> client_sock;

// 第三部分:while循环监听是否有读写事件发生以及是否有新的连接请求

while(1) {
    // client_sock实际相当于一个暂存的用于表示此次循环中有活动(有连接请求的或有读写请求)的socket集合
    // 因此,在每次循环开始之前需要做一次清空
    // 真正存放已连接的socket的数据结构是client_fdset
    client_sock.clear();
    
    /* select做的事可以概括为:当监听到的fd(包含listenfd和已连接的sockfd,
       后者由accept得到)有事件发生时,清空传入的fd,对有事件发生的fd位置1,并返回 */
    // 由于select会修改fd的这个特性,因此传入select的fdset是一份拷贝
    select(..., tmp_fdset, tv);
    
    
    // 用FD_ISSET检查在tmp_fdset中有哪些fd是有响应的;若有响应,则添加到client_sock中
    for(int i=0;i<=maxfd;i++) {
        if(ISSET(i,&tmpfd)){
            ......
            client_sock.push_back(...);
        }
    }
    
    // 在有响应的client_sock中,根据sockfd区分其是监听socket还是连接的socket,从而做不同处理
    for(int i=0;i<client_sock.size();i++) {
        // 监听socket有响应,则接受新连接
        if(client_sock[i].getfd() == listen_sock) {
            int accfd = accept(......);
            FD_SET(accfd, &client_fdset);
        }
        // 有读写事件
        else {
            recv(...);
            ......
        }
    }
}

客户端没有什么值得记录的地方,按照socket()->connect()->send()这个流程走下来就可以了。

在调试的过程中,遇到了几个问题,在这里记录一下:

  1. 开启多个客户端时,为何其返回的fd是相同的?
  2. 若未对sockaddr做初始化或赋值,其默认值应该是什么样的?
  3. fdset和client_sock[]在服务器处理过程中扮演了什么样的角色?经过了怎样的变化?
  4. 在单线程的前提下,若出现同时有连接和读写事件时,服务器会如何表现?

调试后得出的答案:

  1. 理论上,socket()返回的fd应该是不同的,但这种情况仅限于整个操作系统共享一个文件描述符表。在linux中,每个进程都有一张自己的文件描述符表,因此,socket()返回的文件描述符是基于进程自身的,返回相同的文件描述符是正常现象。
  2. 0.0.0.0:0
  3. 在注释中提到过,在select调用中,内核真正会处理的数据结构是FDSET,传入的fdset中,位置为1的表示需要select去监听的文件描述符。但内核在处理过程中,会修改传入的fdset值,即:对于有事件发生的fd,将其在fdset的中的位置1。因此在这里有一个问题是需要处理的——我们的fdset记录了目前处于连接的acceptfd,若执行一次select后会清空fdset(返回有响应的fd),那我们会丢失连接fdset的信息。为避免这种情况发生,有两种解决方案:① 复制fdset,即传入select的fdset为拷贝版,如上代码所示,client_fdset记录已连接的fd,而tmp_fdset作为传入select调用的fdset;② 可仅使用一个fdset,用一个数组存已连接的fd,只在每次循环前将已连接的fd写入fdset中。

poll

poll实质上与select没有什么区别,其本质都是让监听一个文件描述符集,返回有响应的文件描述符,只是poll在具体实现上较为不同,这一点主要体现在其所用的数据结构上。

系统调用原型

int poll(struct pollfd* fds, nfds_t nfds, int timeout);
其中,poll和select最大的不同在于pollfd结构,该结构体如下:

struct pollfd {
    int fd;
    short events;   // 监听/注册的事件
    short revents;  // 实际响应的事件,由内核填充并返回
};

可以看到, poll将select中的FDSET和记录响应事件的client_sock封装在一个结构体中,避免了select对FD_SET的覆盖问题。并且,poll返回的数据是带“状态”的,即poll是有事件类型的,如POLLPRI(高优先级数据可读)、POLLWRBAND(优先级带数据可写)等。
nfds参数指定被监听的事件集合大小;timecout是超时时间,单位为毫秒

基于epoll的TCP服务器

由于这里的思路和select基本一致,所以这里略去大部分代码,只记录poll调用之后如何处理与判断有响应的fd的:

int ret = poll( pollfds, MAX_OPEN, -1);
for(int i=0; i< MAX_OPEN; ++i) {
    if(fds[i].revents & POLLIN) {
        int sockfd = fds[i].fd;
        ......
    }
}

epoll

epoll的实现方式与select和poll有较大的差异,主要体现在两点:① epoll用一组函数而不是一个函数来完成任务;② epoll将要监听的文件描述符放在内核的一个事件表中(事件表本身占用一个额外的文件描述符),因此不需要像select和epoll一样每次都要传递FDSET或pollfds。

系统调用原型

epoll完成任务主要通过三个函数。

int epoll_create(int size)

这个函数的作用主要是创建事件表,size是给内核一个提示,告诉它事件表需要多大。该函数返回一个fd,这个fd即事件表,将被用于后续的epoll处理。

int epoll_ctl(int epfd, int op, struct epoll_event *event)

这个函数是对事件表做一些操作,和select中那几个对FDSET进行操作的函数类似。
其中,epfd就是上面返回的事件表,op即对事件表的操作类型,包括三种:EPOLL_CTL_ADD——注册fd上的事件、EPOLL_CTL_MOD——修改fd上的注册事件、EPOLL_CTL_DEL——删除fd上的注册事件;event是指事件类型,这与poll的事件类型基本相同,不过是以EPOLL开头,但epoll有两个特殊的事件类型:EPOLLET和EPOLLSHOT,EPOLLET代表epoll将使用ET模式即边缘触发模式,这将大大提高epoll的效率,而EPOLLSHOT代表我们期望一个socket在任一时刻都只能被一个线程处理。
epoll_event的结构如下:

struct epoll_event {
    _uint32_t events;   // 这里的event就是上面说的事件类型,不过被定义成了宏
    epoll_data_t data;
};

struct epoll_data {
    void* ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
};

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

这就是epoll主要的系统调用接口,函数返回有响应的fd个数,epfd和events的含义与上面相同,但值得注意的是,这个events用于记录有响应fd。这也是epoll与poll和select较大的差别,在poll和select中,每次执行后,都需要遍历传入的数组或pollfds来判断每一个fd是否有事件发生;而在epoll中,是直接将有事件发生的fd直接写入epoll_data中的fd,这意味着返回的events只包含了有响应的fds,这大大提高了执行的效率。

epoll的LT和ET模式

上面提到,这是EPOLL的重要特性。epoll在默认情况下采用LT(水平触发)模式:即当epoll_wait检测到有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件,等到下一次调用时,epoll_wait还会再次通告此事件;而在ET(边缘触发)模式下,在epoll_wait通知之后,应用程序必须立即处理该事件,之后epoll_wait不会再通知。因此,在ET模式下效率更高。

基于epoll的简易TCP服务器

同样,通过实现服务器客户端的简单通信来理解epoll的作用,由于epoll的特性,着重讨论ET模式和LT模式在实现上的区别。

void addfd(int epollfd, int fd, bool enable_et) {
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN;
    if(enable_et) {
        event.events |= EPOLLET;
    }
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    setnonblocking(fd);
}

// lt mode
void lt(epoll_event* events, int number, int epollfd, int listenfd) {
    printf("lt trigger once.\\n");
    char buf[BUFFER_SIZE];
    for(int i = 0; i < number; i++) {
        int sockfd = events[i].data.fd;
        if(sockfd == listenfd) {
            struct sockaddr_in client_address;
            socklen_t client_addrlen = sizeof(client_address);
            int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlen);
            addfd(epollfd, connfd, false);
        }
        else if(events[i].events & EPOLLIN) {
            printf("event trigger once\\n");
            memset(buf, \'\\0\', BUFFER_SIZE);
            int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0);
            if(ret < 0) {
                printf("recv error.\\n");
                close(sockfd);
                continue;
            }
            printf("get %d byte message: %s\\n", ret, buf);
        }
        else {
            printf("something else happened.\\n");
        }
    }
}

// et mode
void et(epoll_event* events, int number, int epollfd, int listenfd) {
    char buf[BUFFER_SIZE];
    printf("et trigger once.\\n");
    for(int i = 0; i < number; i++) {
        int sockfd = events[i].data.fd;
        if(sockfd == listenfd) {
            struct sockaddr_in client_address;
            socklen_t client_addrlen = sizeof(client_address);
            int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlen);
            addfd(epollfd, connfd, true);
        }
        else if(events[i].events & EPOLLIN) {
            printf("event trigger once.\\n");
            while(1) {
                memset(buf, \'\\0\', BUFFER_SIZE);
                int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0);
                if(ret < 0) {
                    if(errno == EAGAIN || errno == EWOULDBLOCK) {
                        printf("read later.\\n");
                        break;
                    }
                    close(sockfd);
                    break;
                }
                else if(ret == 0) {
                    close(sockfd);
                }
                else {
                    printf("get %d bytes message: %s\\n", ret, buf);
                }
            }
        }
        else {
            printf("something else happend.\\n");
        }
    }
}

int main(int argc, char* argv[]) {
    TcpServer listen_sock, accept_sock;
    CHECK_RET(listen_sock.CreateSocket());
    CHECK_RET(listen_sock.Bind("0.0.0.0", 12345));
    CHECK_RET(listen_sock.Listen());
    struct sockaddr_in client;
    socklen_t client_len = sizeof(client);
    
    epoll_event events[MAX_EVENT_NUMBER];
    int epollfd = epoll_create(5);
    if(epollfd == -1) {
        printf("epoll create error.\\n");
        return -1;
    }
    addfd(epollfd, listen_sock.getfd(), true);

    while(1) {
        int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
        if(ret < 0) {
            printf("epoll error.\\n");
            break;
        }
        et(events, ret, epollfd, listen_sock.getfd());
        // lt(events, ret, epollfd, listen_sock.getfd());
    }
    close(listen_sock.getfd());
    return 0;
}

I/O复用的应用——简易聊天室

这里主要实现的功能为:客户端发送消息,服务器接收后将该消息转发给其他连接的客户端。
server.cpp:

#define _GNU_SOUTCE 1
#include "TcpServer.h"
#include <errno.h>
#include <fcntl.h>
#include <poll.h>

#define USER_LIMIT 5
#define BUFFER_SIZE 64
#define FD_LIMIT 65535
#define CHECK_RET(p) if(p != true) {return -1;}

struct client_data {
    sockaddr_in address;
    char* write_buf;
    char buf[BUFFER_SIZE];
};


int setnonblocking(int fd) {
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

int main() {
    TcpServer listen_sock, accept_sock;
    CHECK_RET(listen_sock.CreateSocket());
    CHECK_RET(listen_sock.Bind("0.0.0.0", 12345));
    CHECK_RET(listen_sock.Listen());
    struct sockaddr_in client;
    socklen_t client_len = sizeof(client);

    client_data* users = new client_data[FD_LIMIT];
    pollfd fds[USER_LIMIT+1];
    int user_cnt = 0;
    for(int i = 0; i <= USER_LIMIT; i++) {
        fds[i].fd = -1;
        fds[i].events = 0;
    }
    fds[0].fd = listen_sock.getfd();
    fds[0].events = POLLIN | POLLERR;
    fds[0].revents = 0;

    while(1) {
        int ret = poll(fds, user_cnt+1, -1);
        if(ret < 0) {
            printf("poll error.\\n");
            break;
        }

        for(int i = 0; i < user_cnt + 1; i++) {
            if(fds[i].fd == listen_sock.getfd() && (fds[i].revents & POLLIN)) {
                int connfd = accept(listen_sock.getfd(), (struct sockaddr*)&client, &client_len);
                if(connfd < 0) {
                    perror("accept error: ");
                    continue;
                }
                if(user_cnt >= USER_LIMIT) {
                    const char* info = "too many users!\\n";
                    printf("%s", info);
                    send(connfd, info, strlen(info), 0);
                    close(connfd);
                    continue;
                }
                user_cnt++;
                users[connfd].address = client;
                setnonblocking(connfd);
                fds[user_cnt].fd = connfd;
                // POLLRDHUP is used to judge whether client is closed, which helps server to avoid regarding closing signal as a read event.
                fds[user_cnt].events = POLLIN | POLLRDHUP | POLLERR;
                fds[user_cnt].revents = 0;
                printf("comes a new user, now have %d users\\n", user_cnt);
            }
            else if(fds[i].revents & POLLERR) {
                printf("get an error from %d\\n", fds[i].fd);
                char errors[100];
                memset(errors, \'\\0\', 100);
                socklen_t len = sizeof(errors);
                if(getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &len) < 0) {
                    printf("get socket option failed\\n");
                }
                continue;
            }
            else if(fds[i].revents & POLLRDHUP) {
                users[fds[i].fd] = users[fds[user_cnt].fd];
                fds[i].fd = fds[user_cnt].fd;
                user_cnt--;
                i--;
                printf("a client left.\\n");
            }
            else if(fds[i].revents & POLLIN) {
                int connfd = fds[i].fd;
                memset(users[connfd].buf, \'\\0\', BUFFER_SIZE);
                ret = recv(connfd, users[connfd].buf, BUFFER_SIZE-1, 0);
                printf("get %d bytes of client data from %d: %s\\n", ret, connfd, users[connfd].buf);
                if(ret < 0) {
                    if(errno != EAGAIN) {
                        close(connfd);
                        users[fds[i].fd] = users[fds[user_cnt].fd];
                        fds[i] = fds[user_cnt];
                        user_cnt--;
                        i--;
                    }   
                }
                else if(ret == 0){
                }
                else {
                    for(int j = 1; j <= user_cnt; j++) {
                        if(fds[j].fd == connfd) {
                            continue;
                        }
                        fds[j].events |= ~POLLIN;
                        fds[j].events |= POLLOUT;
                        users[fds[j].fd].write_buf = users[connfd].buf;
                    }
                }
            }
            else if(fds[i].revents & POLLOUT) {
                int connfd = fds[i].fd;
                if(!users[connfd].write_buf) {
                    continue;
                }
                printf("%s", users[connfd].write_buf);
                ret = send(connfd, users[connfd].write_buf, strlen(users[connfd].write_buf), 0);
                users[connfd].write_buf = NULL;
                fds[i].events |= ~POLLOUT;
                fds[i].events |= POLLIN;
            }
        }
    }
    delete [] users;
    close(listen_sock.getfd());
    return 0;
}

client.cpp:

#define _GNU_SOURCE 1
#include <stdlib.h>
#include <poll.h>
#include <errno.h>
#include "TcpServer.h"
#include <fcntl.h>

#define BUFFER_SIZE 64
#define CHECK_RET(p) if(p != true) return -1;

int main(int argc, char* argv[]) {
    if(argc != 3) {
        printf("usage: ./client [server ip] [server port]\\n");
        return -1;
    }

    char* address = argv[1];
    uint16_t port = atoi(argv[2]);

    TcpServer client_sock;
    
    CHECK_RET(client_sock.CreateSocket());
    CHECK_RET(client_sock.Connect(address, port));

    pollfd fds[2];
    fds[0].fd = 0;
    fds[0].events = POLLIN;
    fds[0].revents = 0;
    fds[1].fd = client_sock.getfd();
    fds[1].events = POLLIN | POLLRDHUP;
    fds[1].revents = 0;

    char read_buf[BUFFER_SIZE];
    int pipefd[2];
    int ret = pipe(pipefd);

    while(1) {
        ret = poll(fds, 2, -1);
        if(ret < 0) {
            printf("poll error.\\n");
            break;
        }
        if(fds[1].revents & POLLRDHUP) {
            printf("server close the connect.\\n");
            break;
        }
        else if(fds[1].revents & POLLIN) {
            memset(read_buf, \'\\0\', BUFFER_SIZE);
            ret = recv(fds[1].fd, read_buf, BUFFER_SIZE-1, 0);
            printf("receive %d bytes message from server: %s\\n",ret, read_buf);
        }
        if(fds[0].revents & POLLIN) {
            ret = splice(0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
            ret = splice(pipefd[0], NULL, client_sock.getfd(), NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
        }
    }
    close(client_sock.getfd());
    return 0;
}

处理方式上的几点改进:

  1. 由于程序的目的不同,服务器在接收消息的同时还要转发,因此,对于每个sockfd,都封装了一个结构体来存储其地址、发送数据指针、接收数据缓存。
  2. 增加了判断客户端退出的步骤。在之前的程序中,由于没有使用POLLRDHUP做退出判断,在客户端断开连接时,服务器会误认为有可读事件发生;在增加了一层判断后,服务器可以做更好的善后处理。另外,在这一段程序中,有客户端程序退出,则将客户端对应fd的最大下标user_cnt挪到当前的下标位置,并把总数-1,这样能够始终保证1~user_cnt对应的客户端始终是保持连接的
  3. 采用splice进行零拷贝:ssize_t splice(int fd_in, loff_t off_in, int fd_out, loff_t off_out, size_t len, unsigned int flags);该函数用于实现两个fd之间数据的零拷贝。注意:fd = 0代表的是标准输入流(可理解为键盘),因此在client.cpp中,第一个splice代表将数据从标准输入流中拷贝到管道的写端,第二个splice将数据从管道中读出并拷贝到sockfd中。

以上是关于网络编程I/O复用函数的主要内容,如果未能解决你的问题,请参考以下文章

unix下网络编程之I/O复用

网络编程:I/O复用

第15章 高并发服务器编程_I/O多路复用

UNIX网络编程笔记—I/O复用select/poll

unix下网络编程之I/O复用

Linux学习_多路I/O复用