Unix/Linux 编程:网络编程之 epoll与Reactor

Posted sesiria

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Unix/Linux 编程:网络编程之 epoll与Reactor相关的知识,希望对你有一定的参考价值。

在上一篇文章中实现了epoll的非阻塞IO的服务端程序

可是如果实际应用中需要注册大量事件,一个超长的if代码块会严重降低代码的可读性和管理

因此参考了 Reactor模型的实现改写了epoll服务器。讲所有的事件处理函数都写成回调函数的形式。这里注意几个参数传递时候的坑。

/* Server based on EPOLL */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/epoll.h>

#define BUFFER_LENGTH   1024

struct sockitem {
    int epollfd;
    int sockfd;
    int (*callback)(int fd, int events, void* arg);
};

int recv_cb(int fd, int events, void* arg) {
    if (!(events & EPOLLIN) || arg == NULL)
        return -1;
    printf("trigger recv_cb\\n");
    struct epoll_event* ev = (struct epoll_event*)arg;
    struct sockitem* si = (struct sockitem*)ev->data.ptr;

    int clientfd = si->sockfd;
    int epoll_fd = si->epollfd;

    char buffer[BUFFER_LENGTH] = { 0 };
    int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);
    if (ret < 0) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            printf("read all data\\n");
        }
        close(clientfd);
        ev->events = EPOLLIN | EPOLLET;
        ev->data.fd = clientfd;
        ev->data.ptr = NULL;
        free(si);
        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, ev);
    }
    else if (ret == 0) {
        printf(" disconnect clientfd:%d\\n", clientfd);

        close(clientfd);
        ev->events = EPOLLIN | EPOLLET;
        ev->data.fd = clientfd;
        ev->data.ptr = NULL;
        free(si);
        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, ev);
        return 0;
    }
    else {
        printf("Recv: %s, %d Bytes\\n", buffer, ret);
        if (send(clientfd, "Hello, you are connected!\\n", 26, 0)
            == -1) {
            // send data error maybe some network issue.
            perror("send error");
            close(clientfd);
        }
    }
}

// callback handler for accept events of the sockfd.
int accept_cb(int fd, int events, void* arg) {
    if (!(events & EPOLLIN) || arg == NULL)
        return -1;
    printf("trigger accept_cb\\n");
    struct epoll_event* ev = (struct epoll_event*)arg;
    struct sockitem* psi = (struct sockitem*)ev->data.ptr;
    int epoll_fd = psi->epollfd;

    struct sockaddr_in client_addr;
    memset(&client_addr, 0, sizeof(struct sockaddr_in));
    socklen_t client_len = sizeof(client_addr);
    int sockfd = psi->sockfd;
    int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);
    printf("clientfd: %d\\n", clientfd);
    if (clientfd <= 0)
        return -1; // do nothing.
    printf("trigger malloc\\n");
    char str[INET_ADDRSTRLEN] = { 0 };

    struct sockitem* si = (struct sockitem*)malloc(sizeof(struct sockitem));
    si->epollfd = epoll_fd;
    si->sockfd = clientfd;
    si->callback = recv_cb;

    printf("received from %s at port:%d, sockfd:%d, clientfd:%d\\n",
        inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),
        ntohs(client_addr.sin_port), sockfd, clientfd);

    ev->events = EPOLLIN | EPOLLET;
    ev->data.fd = clientfd;
    ev->data.ptr = si;
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, ev);
}

// ./epoll 8080.
int main(int argc, char* argv[]) {

    if (argc < 2) {
        printf("parameter error!\\n");
        exit(EXIT_FAILURE);
    }

    int port = atoi(argv[1]);

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        return -1;
    }

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(struct sockaddr_in));

    addr.sin_family = AF_INET;
    addr.sin_port = htons(port); // convert network bytes
    addr.sin_addr.s_addr = INADDR_ANY;

    if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
        exit(EXIT_FAILURE);
    }

    printf("start server and wait for connection...\\n");
    if (listen(sockfd, 5) < 0) {
        exit(EXIT_FAILURE);
    }

    // epoll coding.
    int epfd = epoll_create(1); // create the epoll root node for epoll object.
    struct epoll_event ev, events[1024] = { 0 };
    ev.events = EPOLLIN;
    ev.data.fd = sockfd;


    // define socketitem
    struct sockitem* si = (struct sockitem*)malloc(sizeof(struct sockitem));
    si->epollfd = epfd;
    si->sockfd = sockfd;
    si->callback = accept_cb;
    ev.data.ptr = si;

    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);

    while (1) {
        // condition wait.
        // max amount of events could be caught by 1 epoll_wait call.
        int nready = epoll_wait(epfd, events, 1024, -1);
        if (nready < -1) {
            break;
        }

        int i = 0;
        for (i = 0; i < nready; i++) {

            if (events[i].events & EPOLLIN) {
                struct sockitem* si = (struct sockitem*)events[i].data.ptr;
                si->callback(events[i].data.fd, events[i].events, &events[i]);
            }

            if (events[i].events & EPOLLOUT) {
                printf("trigger EPOLLOUT\\n");
            }
        }
    }
    close(sockfd);
    exit(EXIT_SUCCESS);
}

经过修改后的epoll

将事件的处理与epoll_wait分离。并添加了支持recv_cb, send_cb。并将epoll事件管理器数据结构封装为一个reactor结构体

/* Server based on EPOLL */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/epoll.h>

#define BUFFER_LENGTH   1024
#define EVENT_SIZE      1024

struct sockitem {
    int sockfd;
    int (*callback)(int events, void* arg);
    char recvbuffer[BUFFER_LENGTH];
    char sendbuffer[BUFFER_LENGTH];
};

struct reactor{
    int epollfd;
    struct epoll_event events[EVENT_SIZE];
};

struct reactor * g_eventloop = NULL;

int recv_cb(int events, void* arg);

int send_cb(int events, void *arg) {
    if (!(events & EPOLLOUT) || arg == NULL)
        return -1;
    struct sockitem* si = (struct sockitem*)arg;

    int clientfd = si->sockfd;
    int epoll_fd = g_eventloop->epollfd;
    if (send(clientfd, "Hello, you are connected!\\n", 26, 0)
            == -1) {
        if(errno == EAGAIN || errno == EWOULDBLOCK) {
                // the send buffer is full.
                // add send into EPOLLOUT event.
            return 0;
        }
            // send data error maybe some network issue.
        perror("send error");
        close(clientfd);
        return -1;
    }
    // change state into EPOLLIN
    struct epoll_event ev;
    ev.events = EPOLLIN | EPOLLET;
    si->callback = recv_cb;
    ev.data.ptr = si;
    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, clientfd, &ev);
    return 0;
}

int recv_cb(int events, void* arg) {
    if (!(events & EPOLLIN) || arg == NULL)
        return -1;

    struct sockitem* si = (struct sockitem*)arg;

    int clientfd = si->sockfd;
    int epoll_fd = g_eventloop->epollfd;

    char buffer[BUFFER_LENGTH] = { 0 };
    struct epoll_event ev;
    int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);
    if (ret < 0) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            printf("read all data\\n");
        }
        close(clientfd);
        ev.events = EPOLLIN | EPOLLET;
        //ev->data.fd = clientfd;
        ev.data.ptr = NULL;
        free(si);
        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev);
    }
    else if (ret == 0) {
        printf(" disconnect clientfd:%d\\n", clientfd);

        close(clientfd);
        ev.events = EPOLLIN | EPOLLET;
        //ev->data.fd = clientfd;
        ev.data.ptr = NULL;
        free(si);
        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev);
        return 0;
    }
    else {
        printf("Recv: %s, %d Bytes\\n", buffer, ret);
        ev.events = EPOLLOUT | EPOLLET;
        si->sockfd = clientfd;
        si->callback = send_cb;
        ev.data.ptr = si;
        epoll_ctl(epoll_fd, EPOLL_CTL_MOD, clientfd, &ev);
    }
}

// callback handler for accept events of the sockfd.
int accept_cb(int events, void* arg) {
    if (!(events & EPOLLIN) || arg == NULL)
        return -1;

    struct sockitem* psi = (struct sockitem*)arg;
    int epoll_fd = g_eventloop->epollfd;
    int sockfd = psi->sockfd;

    struct sockaddr_in client_addr;
    struct epoll_event ev;
    memset(&client_addr, 0, sizeof(struct sockaddr_in));
    socklen_t client_len = sizeof(client_addr);

    int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);
    printf("clientfd: %d\\n", clientfd);
    if (clientfd <= 0)
        return -1; // do nothing.

    char str[INET_ADDRSTRLEN] = { 0 };

    struct sockitem* si = (struct sockitem*)malloc(sizeof(struct sockitem));
    si->sockfd = clientfd;
    si->callback = recv_cb;

    printf("received from %s at port:%d, sockfd:%d, clientfd:%d\\n",
        inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),
        ntohs(client_addr.sin_port), sockfd, clientfd);

    ev.events = EPOLLIN | EPOLLET;
    ev.data.ptr = si;
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, &ev);
}

// ./epoll 8080.
int main(int argc, char* argv[]) {

    if (argc < 2) {
        printf("parameter error!\\n");
        exit(EXIT_FAILURE);
    }

    int port = atoi(argv[1]);

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        return -1;
    }

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(struct sockaddr_in));

    addr.sin_family = AF_INET;
    addr.sin_port = htons(port); // convert network bytes
    addr.sin_addr.s_addr = INADDR_ANY;

    if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
        exit(EXIT_FAILURE);
    }

    printf("start server and wait for connection...\\n");
    if (listen(sockfd, 5) < 0) {
        exit(EXIT_FAILURE);
    }

    g_eventloop = (struct reactor*) malloc(sizeof(struct reactor));


    // epoll coding.
    g_eventloop->epollfd = epoll_create(1); // create the epoll root node for epoll object.
    struct epoll_event ev;
    ev.events = EPOLLIN;
    //ev.data.fd = sockfd;


    // define socketitem
    struct sockitem* si = (struct sockitem*)malloc(sizeof(struct sockitem));
    si->sockfd = sockfd;
    si->callback = accept_cb;
    ev.data.ptr = si;

    epoll_ctl(g_eventloop->epollfd, EPOLL_CTL_ADD, sockfd, &ev);

    while (1) {
        // condition wait.
        // max amount of events could be caught by 1 epoll_wait call.
        int nready = epoll_wait(g_eventloop->epollfd, g_eventloop->events, EVENT_SIZE, -1);
        if (nready < -1) {
            break;
        }

        int i = 0;
        for (i = 0; i < nready; i++) {

            if (g_eventloop->events[i].events & EPOLLIN) {
                struct sockitem* si = (struct sockitem*)g_eventloop->events[i].data.ptr;
                if(si && si->callback)
                    si->callback(g_eventloop->events[i].events, si);
            }

            if (g_eventloop->events[i].events & EPOLLOUT) {
                struct sockitem* si = (struct sockitem*)g_eventloop->events[i].data.ptr;
                if(si && si->callback)
                    si->callback(g_eventloop->events[i].events, si);
            }
        }
    }
    close(sockfd);
    exit(EXIT_SUCCESS);
}

 

以上是关于Unix/Linux 编程:网络编程之 epoll与Reactor的主要内容,如果未能解决你的问题,请参考以下文章

Unix/Linux 编程:网络编程之 IO模型

Unix/Linux 编程:网络编程之 线程池

Unix/Linux 编程:网络编程之 内存池

Linux网络编程——多路复用之epoll

高并发网络编程之epoll详解

python 网络编程 IO多路复用之epoll