Unix/Linux 编程:网络编程之 epoll与Reactor
Posted sesiria
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Unix/Linux 编程:网络编程之 epoll与Reactor相关的知识,希望对你有一定的参考价值。
在上一篇文章中实现了epoll的非阻塞IO的服务端程序
可是如果实际应用中需要注册大量事件,一个超长的if代码块会严重降低代码的可读性和管理
因此参考了 Reactor模型的实现改写了epoll服务器。讲所有的事件处理函数都写成回调函数的形式。这里注意几个参数传递时候的坑。
/* Server based on EPOLL */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#define BUFFER_LENGTH 1024
struct sockitem {
int epollfd;
int sockfd;
int (*callback)(int fd, int events, void* arg);
};
int recv_cb(int fd, int events, void* arg) {
if (!(events & EPOLLIN) || arg == NULL)
return -1;
printf("trigger recv_cb\\n");
struct epoll_event* ev = (struct epoll_event*)arg;
struct sockitem* si = (struct sockitem*)ev->data.ptr;
int clientfd = si->sockfd;
int epoll_fd = si->epollfd;
char buffer[BUFFER_LENGTH] = { 0 };
int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);
if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("read all data\\n");
}
close(clientfd);
ev->events = EPOLLIN | EPOLLET;
ev->data.fd = clientfd;
ev->data.ptr = NULL;
free(si);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, ev);
}
else if (ret == 0) {
printf(" disconnect clientfd:%d\\n", clientfd);
close(clientfd);
ev->events = EPOLLIN | EPOLLET;
ev->data.fd = clientfd;
ev->data.ptr = NULL;
free(si);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, ev);
return 0;
}
else {
printf("Recv: %s, %d Bytes\\n", buffer, ret);
if (send(clientfd, "Hello, you are connected!\\n", 26, 0)
== -1) {
// send data error maybe some network issue.
perror("send error");
close(clientfd);
}
}
}
// callback handler for accept events of the sockfd.
int accept_cb(int fd, int events, void* arg) {
if (!(events & EPOLLIN) || arg == NULL)
return -1;
printf("trigger accept_cb\\n");
struct epoll_event* ev = (struct epoll_event*)arg;
struct sockitem* psi = (struct sockitem*)ev->data.ptr;
int epoll_fd = psi->epollfd;
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int sockfd = psi->sockfd;
int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);
printf("clientfd: %d\\n", clientfd);
if (clientfd <= 0)
return -1; // do nothing.
printf("trigger malloc\\n");
char str[INET_ADDRSTRLEN] = { 0 };
struct sockitem* si = (struct sockitem*)malloc(sizeof(struct sockitem));
si->epollfd = epoll_fd;
si->sockfd = clientfd;
si->callback = recv_cb;
printf("received from %s at port:%d, sockfd:%d, clientfd:%d\\n",
inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),
ntohs(client_addr.sin_port), sockfd, clientfd);
ev->events = EPOLLIN | EPOLLET;
ev->data.fd = clientfd;
ev->data.ptr = si;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, ev);
}
// ./epoll 8080.
int main(int argc, char* argv[]) {
if (argc < 2) {
printf("parameter error!\\n");
exit(EXIT_FAILURE);
}
int port = atoi(argv[1]);
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
return -1;
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(port); // convert network bytes
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
exit(EXIT_FAILURE);
}
printf("start server and wait for connection...\\n");
if (listen(sockfd, 5) < 0) {
exit(EXIT_FAILURE);
}
// epoll coding.
int epfd = epoll_create(1); // create the epoll root node for epoll object.
struct epoll_event ev, events[1024] = { 0 };
ev.events = EPOLLIN;
ev.data.fd = sockfd;
// define socketitem
struct sockitem* si = (struct sockitem*)malloc(sizeof(struct sockitem));
si->epollfd = epfd;
si->sockfd = sockfd;
si->callback = accept_cb;
ev.data.ptr = si;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
while (1) {
// condition wait.
// max amount of events could be caught by 1 epoll_wait call.
int nready = epoll_wait(epfd, events, 1024, -1);
if (nready < -1) {
break;
}
int i = 0;
for (i = 0; i < nready; i++) {
if (events[i].events & EPOLLIN) {
struct sockitem* si = (struct sockitem*)events[i].data.ptr;
si->callback(events[i].data.fd, events[i].events, &events[i]);
}
if (events[i].events & EPOLLOUT) {
printf("trigger EPOLLOUT\\n");
}
}
}
close(sockfd);
exit(EXIT_SUCCESS);
}
经过修改后的epoll
将事件的处理与epoll_wait分离。并添加了支持recv_cb, send_cb。并将epoll事件管理器数据结构封装为一个reactor结构体
/* Server based on EPOLL */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#define BUFFER_LENGTH 1024
#define EVENT_SIZE 1024
struct sockitem {
int sockfd;
int (*callback)(int events, void* arg);
char recvbuffer[BUFFER_LENGTH];
char sendbuffer[BUFFER_LENGTH];
};
struct reactor{
int epollfd;
struct epoll_event events[EVENT_SIZE];
};
struct reactor * g_eventloop = NULL;
int recv_cb(int events, void* arg);
int send_cb(int events, void *arg) {
if (!(events & EPOLLOUT) || arg == NULL)
return -1;
struct sockitem* si = (struct sockitem*)arg;
int clientfd = si->sockfd;
int epoll_fd = g_eventloop->epollfd;
if (send(clientfd, "Hello, you are connected!\\n", 26, 0)
== -1) {
if(errno == EAGAIN || errno == EWOULDBLOCK) {
// the send buffer is full.
// add send into EPOLLOUT event.
return 0;
}
// send data error maybe some network issue.
perror("send error");
close(clientfd);
return -1;
}
// change state into EPOLLIN
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
si->callback = recv_cb;
ev.data.ptr = si;
epoll_ctl(epoll_fd, EPOLL_CTL_MOD, clientfd, &ev);
return 0;
}
int recv_cb(int events, void* arg) {
if (!(events & EPOLLIN) || arg == NULL)
return -1;
struct sockitem* si = (struct sockitem*)arg;
int clientfd = si->sockfd;
int epoll_fd = g_eventloop->epollfd;
char buffer[BUFFER_LENGTH] = { 0 };
struct epoll_event ev;
int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);
if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("read all data\\n");
}
close(clientfd);
ev.events = EPOLLIN | EPOLLET;
//ev->data.fd = clientfd;
ev.data.ptr = NULL;
free(si);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev);
}
else if (ret == 0) {
printf(" disconnect clientfd:%d\\n", clientfd);
close(clientfd);
ev.events = EPOLLIN | EPOLLET;
//ev->data.fd = clientfd;
ev.data.ptr = NULL;
free(si);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev);
return 0;
}
else {
printf("Recv: %s, %d Bytes\\n", buffer, ret);
ev.events = EPOLLOUT | EPOLLET;
si->sockfd = clientfd;
si->callback = send_cb;
ev.data.ptr = si;
epoll_ctl(epoll_fd, EPOLL_CTL_MOD, clientfd, &ev);
}
}
// callback handler for accept events of the sockfd.
int accept_cb(int events, void* arg) {
if (!(events & EPOLLIN) || arg == NULL)
return -1;
struct sockitem* psi = (struct sockitem*)arg;
int epoll_fd = g_eventloop->epollfd;
int sockfd = psi->sockfd;
struct sockaddr_in client_addr;
struct epoll_event ev;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);
printf("clientfd: %d\\n", clientfd);
if (clientfd <= 0)
return -1; // do nothing.
char str[INET_ADDRSTRLEN] = { 0 };
struct sockitem* si = (struct sockitem*)malloc(sizeof(struct sockitem));
si->sockfd = clientfd;
si->callback = recv_cb;
printf("received from %s at port:%d, sockfd:%d, clientfd:%d\\n",
inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),
ntohs(client_addr.sin_port), sockfd, clientfd);
ev.events = EPOLLIN | EPOLLET;
ev.data.ptr = si;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, &ev);
}
// ./epoll 8080.
int main(int argc, char* argv[]) {
if (argc < 2) {
printf("parameter error!\\n");
exit(EXIT_FAILURE);
}
int port = atoi(argv[1]);
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
return -1;
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(port); // convert network bytes
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
exit(EXIT_FAILURE);
}
printf("start server and wait for connection...\\n");
if (listen(sockfd, 5) < 0) {
exit(EXIT_FAILURE);
}
g_eventloop = (struct reactor*) malloc(sizeof(struct reactor));
// epoll coding.
g_eventloop->epollfd = epoll_create(1); // create the epoll root node for epoll object.
struct epoll_event ev;
ev.events = EPOLLIN;
//ev.data.fd = sockfd;
// define socketitem
struct sockitem* si = (struct sockitem*)malloc(sizeof(struct sockitem));
si->sockfd = sockfd;
si->callback = accept_cb;
ev.data.ptr = si;
epoll_ctl(g_eventloop->epollfd, EPOLL_CTL_ADD, sockfd, &ev);
while (1) {
// condition wait.
// max amount of events could be caught by 1 epoll_wait call.
int nready = epoll_wait(g_eventloop->epollfd, g_eventloop->events, EVENT_SIZE, -1);
if (nready < -1) {
break;
}
int i = 0;
for (i = 0; i < nready; i++) {
if (g_eventloop->events[i].events & EPOLLIN) {
struct sockitem* si = (struct sockitem*)g_eventloop->events[i].data.ptr;
if(si && si->callback)
si->callback(g_eventloop->events[i].events, si);
}
if (g_eventloop->events[i].events & EPOLLOUT) {
struct sockitem* si = (struct sockitem*)g_eventloop->events[i].data.ptr;
if(si && si->callback)
si->callback(g_eventloop->events[i].events, si);
}
}
}
close(sockfd);
exit(EXIT_SUCCESS);
}
以上是关于Unix/Linux 编程:网络编程之 epoll与Reactor的主要内容,如果未能解决你的问题,请参考以下文章