第十一章 定时器
这里的定时器主要指的是定时器容器,TCP连接中有保活定时器,为了定期查看TCP连接是否断开,可以用socket选项实现,但是较为麻烦,所以一般都由应用层负责保活,这是定时器的一个运用场景,或者在应用层需要执行一些定时操作,这样就需要一个高效的定时器容器,主要是时间轮和时间堆,当然定时器也可以用SIGALRM信号以及I/O复用实现。
socket选项中我们使用SO_RCVTIMEO和SO_SNDTIMEO两个选项进行设置,我们来看一个例子:
1 /************************************************************************* 2 > File Name: 11-1.cpp 3 > Author: Torrance_ZHANG 4 > Mail: [email protected] 5 > Created Time: Sat 10 Feb 2018 11:13:03 PM PST 6 ************************************************************************/ 7 8 #include"head.h" 9 using namespace std; 10 11 int timeout_connect(const char* ip, int port, int time) { 12 int ret = 0; 13 struct sockaddr_in address; 14 bzero(&address, sizeof(address)); 15 address.sin_family = AF_INET; 16 inet_pton(AF_INET, ip, &address.sin_addr); 17 address.sin_port = htons(port); 18 19 int sockfd = socket(AF_INET, SOCK_STREAM, 0); 20 assert(sockfd >= 0); 21 22 struct timeval timeout; 23 timeout.tv_sec = time; 24 timeout.tv_usec = 0; 25 socklen_t len = sizeof(timeout); 26 ret = setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len); 27 assert(ret != -1); 28 29 ret = connect(sockfd, (struct sockaddr*)&address, sizeof(address)); 30 if(ret == -1) { 31 if(errno == EINPROGRESS) { 32 printf("connection timeout, process timeout logic \\n"); 33 return -1; 34 } 35 printf("error occur when connection to server\\n"); 36 return -1; 37 } 38 return sockfd; 39 } 40 41 int main(int argc, char** argv) { 42 if(argc <= 2) { 43 printf("usage: %s ip_address port_number\\n", basename(argv[0])); 44 return 1; 45 } 46 const char* ip = argv[1]; 47 int port = atoi(argv[2]); 48 49 int sockfd = timeout_connect(ip, port, 10); 50 if(sockfd < 0) return 1; 51 else return 0; 52 }
我们用5-7的服务器,将监听队列改成1,用两个telnet客户端占用监听队列,这样再运行这个客户端后,等待10s出现超时的结果。
关于SIGALRM信号,我们知道他是当定时器到时时系统给应用程序的通知,所以基于这个信号,我们设计一个基于升序链表的定时器结构,为了引用方便,我们写在头文件中,其按照超时时间升序排序:
1 /************************************************************************* 2 > File Name: lst_timer.h 3 > Author: Torrance_ZHANG 4 > Mail: [email protected] 5 > Created Time: Sat 10 Feb 2018 11:39:56 PM PST 6 ************************************************************************/ 7 8 #ifndef _LST_TIMER_H 9 #define _LST_TIMER_H 10 11 #include<time.h> 12 #define BUFFER_SIZE 64 13 class util_timer; 14 15 struct client_data { 16 sockaddr_in address; 17 int sockfd; 18 char buf[BUFFER_SIZE]; 19 util_timer* timer; 20 }; 21 22 class util_timer { 23 public: 24 util_timer() : prev(NULL), next(NULL) {} 25 public: 26 time_t expire; 27 void (*cb_func)(client_data*); 28 client_data* user_data; 29 util_timer* prev; 30 util_timer* next; 31 }; 32 33 class sort_timer_lst { 34 public: 35 sort_timer_lst() : head(NULL), tail(NULL) {} 36 ~sort_timer_lst() { 37 util_timer* tmp = head; 38 while(tmp) { 39 head = tmp -> next; 40 delete tmp; 41 tmp = head; 42 } 43 } 44 void add_timer(util_timer* timer) { 45 if(!timer) return; 46 if(!head) { 47 head = tail = timer; 48 return; 49 } 50 if(timer -> expire < head -> expire) { 51 timer -> next = head; 52 head -> prev = timer; 53 head = timer; 54 return; 55 } 56 add_timer(timer, head); 57 } 58 void adjust_timer(util_timer* timer) { 59 if(!timer) return; 60 util_timer* tmp = timer -> next; 61 if(!tmp || (timer -> expire < tmp -> expire)) return; 62 if(timer == head) { 63 head = head -> next; 64 head -> prev = NULL; 65 timer -> next = NULL; 66 add_timer(timer, head); 67 } 68 else { 69 timer -> prev -> next = timer -> next; 70 timer -> next -> prev = timer -> prev; 71 add_timer(timer, timer -> next); 72 } 73 } 74 void del_timer(util_timer* timer) { 75 if(!timer) return; 76 if((timer == head) && (timer == tail)) { 77 delete timer; 78 head = NULL; 79 tail = NULL; 80 return; 81 } 82 if(timer == head) { 83 head = head -> next; 84 head -> prev = NULL; 85 delete timer; 86 return; 87 } 88 if(timer == tail) { 89 tail = tail -> prev; 90 tail -> next = NULL; 91 delete timer; 92 return; 93 } 94 timer -> prev -> next = timer -> next; 95 timer -> next -> prev = timer -> prev; 96 delete timer; 97 } 98 void tick() { 99 if(!head) return; 100 printf("timer tick\\n"); 101 time_t cur = time(NULL); 102 util_timer* tmp = head; 103 while(tmp) { 104 if(cur < tmp -> expire) break; 105 tmp -> cb_func(tmp -> user_data); 106 head = tmp -> next; 107 if(head) head -> prev = NULL; 108 delete tmp; 109 tmp = head; 110 } 111 } 112 private: 113 void add_timer(util_timer* timer, util_timer* lst_head) { 114 util_timer* prev = lst_head; 115 util_timer* tmp = prev -> next; 116 while(tmp) { 117 if(timer -> expire < tmp -> expire) { 118 prev -> next = timer; 119 timer -> next = tmp; 120 tmp -> prev = timer; 121 timer -> prev = prev; 122 break; 123 } 124 prev = tmp; 125 tmp = tmp -> next; 126 } 127 if(!tmp) { 128 prev -> next = timer; 129 timer -> prev = prev; 130 timer -> next = NULL; 131 tail = timer; 132 } 133 } 134 private: 135 util_timer* head; 136 util_timer* tail; 137 }; 138 139 #endif
接下来我们看一下其实际应用——处理非活动的连接,在服务器运行时,对于很多已经建立的TCP连接并不是时时刻刻都有数据发送,所以我们对于每个TCP连接都要有一个定时器,一旦其没有数据的状态持续一定的时间就可以认为这是非活动的连接并且将之关闭,目的是节省服务器的资源:
1 /************************************************************************* 2 > File Name: 11-3.cpp 3 > Author: Torrance_ZHANG 4 > Mail: [email protected] 5 > Created Time: Sun 11 Feb 2018 04:23:50 AM PST 6 ************************************************************************/ 7 8 #include"head.h" 9 #include"lst_timer.h" 10 using namespace std; 11 12 #define FD_LIMIT 65535 13 #define MAX_EVENT_NUMBER 1024 14 #define TIMESLOT 5 15 16 static int pipefd[2]; 17 static sort_timer_lst timer_lst; 18 static int epollfd = 0; 19 20 int setnonblocking(int fd) { 21 int old_option = fcntl(fd, F_GETFL); 22 int new_option = old_option | O_NONBLOCK; 23 fcntl(fd, F_SETFL, new_option); 24 return old_option; 25 } 26 27 void addfd(int epollfd, int fd) { 28 epoll_event event; 29 event.data.fd = fd; 30 event.events = EPOLLIN | EPOLLET; 31 epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); 32 setnonblocking(fd); 33 } 34 35 void sig_handler(int sig) { 36 int save_errno = errno; 37 int msg = sig; 38 send(pipefd[1], (char*)&msg, 1, 0); 39 errno = save_errno; 40 } 41 42 void addsig(int sig) { 43 struct sigaction sa; 44 memset(&sa, 0, sizeof(sa)); 45 sa.sa_handler = sig_handler; 46 sa.sa_flags |= SA_RESTART; 47 sigfillset(&sa.sa_mask); 48 assert(sigaction(sig, &sa, NULL) != -1); 49 } 50 51 void timer_handler() { 52 timer_lst.tick(); 53 alarm(TIMESLOT); 54 } 55 56 void cb_func(client_data* user_data) { 57 epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data -> sockfd, 0); 58 assert(user_data); 59 close(user_data -> sockfd); 60 printf("close fd %d\\n", user_data -> sockfd); 61 } 62 63 int main(int argc, char** argv) { 64 if(argc <= 2) { 65 printf("usage: %s ip_address port_number\\n", basename(argv[0])); 66 return 1; 67 } 68 const char* ip = argv[1]; 69 int port = atoi(argv[2]); 70 71 int ret = 0; 72 struct sockaddr_in address; 73 bzero(&address, sizeof(address)); 74 address.sin_family = AF_INET; 75 inet_pton(AF_INET, ip, &address.sin_addr); 76 address.sin_port = htons(port); 77 78 int listenfd = socket(AF_INET, SOCK_STREAM, 0); 79 assert(listenfd >= 0); 80 81 ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); 82 if(ret == -1) { 83 printf("errno is %d\\n", errno); 84 return 1; 85 } 86 ret = listen(listenfd, 5); 87 assert(ret != -1); 88 89 epoll_event events[MAX_EVENT_NUMBER]; 90 int epollfd = epoll_create(5); 91 assert(epollfd != -1); 92 addfd(epollfd, listenfd); 93 94 ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd); 95 assert(ret != -1); 96 setnonblocking(pipefd[1]); 97 addfd(epollfd, pipefd[0]); 98 99 addsig(SIGALRM); 100 addsig(SIGTERM); 101 bool stop_server = false; 102 client_data* users = new client_data[FD_LIMIT]; 103 bool timeout = false; 104 alarm(TIMESLOT); 105 106 while(!stop_server) { 107 int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); 108 if((number < 0) && (errno != EINTR)) { 109 printf("epoll failure\\n"); 110 break; 111 } 112 for(int i = 0; i < number; i ++) { 113 int sockfd = events[i].data.fd; 114 if(sockfd == listenfd) { 115 struct sockaddr_in client_address; 116 socklen_t client_addrlength = sizeof(address); 117 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 118 addfd(epollfd, connfd); 119 users[connfd].address = client_address; 120 users[connfd].sockfd = connfd; 121 122 util_timer* timer = new util_timer; 123 timer -> user_data = &users[connfd]; 124 timer -> cb_func = cb_func; 125 time_t cur = time(NULL); 126 timer -> expire = cur + 3 * TIMESLOT; 127 users[connfd].timer = timer; 128 timer_lst.add_timer(timer); 129 } 130 else if((sockfd == pipefd[0]) && (events[i].events & EPOLLIN)) { 131 int sig; 132 char signals[1024]; 133 ret = recv(pipefd[0], signals, sizeof(signals), 0); 134 if(ret == -1) continue; 135 else if(ret == 0) continue; 136 else { 137 for(int i = 0; i < ret; i ++) { 138 switch(signals[i]) { 139 case SIGALRM: { 140 timeout = true; 141 break; 142 } 143 case SIGTERM: { 144 stop_server = true; 145 } 146 } 147 } 148 } 149 } 150 else if(events[i].events & EPOLLIN) { 151 memset(users[sockfd].buf, 0, BUFFER_SIZE); 152 ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE, 0); 153 printf("get %d bytes of client data %s from %d\\n", ret, users[sockfd].buf, sockfd); 154 155 util_timer* timer = users[sockfd].timer; 156 if(ret < 0) { 157 if(errno != EAGAIN) { 158 cb_func(&users[sockfd]); 159 if(timer) { 160 timer_lst.del_timer(timer); 161 } 162 } 163 } 164 else if(ret == 0) { 165 cb_func(&users[sockfd]); 166 if(timer) { 167 timer_lst.del_timer(timer); 168 } 169 } 170 else { 171 if(timer) { 172 time_t cur = time(NULL); 173 timer -> expire = cur + 3 * TIMESLOT; 174 printf("adjust timer once\\n"); 175 timer_lst.adjust_timer(timer); 176 } 177 } 178 } 179 else {} 180 } 181 if(timeout) { 182 timer_handler(); 183 timeout = false; 184 } 185 } 186 close(listenfd); 187 close(pipefd[1]); 188 close(pipefd[0]); 189 delete [] users; 190 return 0; 191 }
我们可以看到,客户端连接一段时间后没有数据发送,服务器端就自动关闭连接,以节省服务器资源。
前面提到I/O复用系统调用的最后一个timeval参数时说过这个是用来控制超时时间的,所以我们也可以用这个参数作为定时,但是我们需要不断更新这个定时参数,用一个代码来说明一下:
1 /************************************************************************* 2 > File Name: 11-4.cpp 3 > Author: Torrance_ZHANG 4 > Mail: [email protected] 5 > Created Time: Mon 12 Feb 2018 01:49:52 AM PST 6 ************************************************************************/ 7 8 #include<iostream> 9 using namespace std; 10 11 #define TIMEOUT 5000 12 13 int main() { 14 time_t start = time(NULL); 15 time_t end = time(NULL); 16 int timeout = TIMEOUT; 17 18 while(1) { 19 printf("the timeout is now %d mil-seconds\\n", timeout); 20 start = time(NULL); 21 int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, timeout); 22 if((number < ) && (errno != EINTR)) { 23 printf("epoll failure\\n"); 24 break; 25 } 26 if(number == 0) { 27 timeout = TIMEOUT; 28 continue; 29 } 30 end = time(NULL); 31 timeout -= (end - start) * 1000; 32 if(timeout <= 0) timeout = TIMEOUT; 33 } 34 }
前面我们提到的基于升序链表的定时器容器存在着一个问题,就是添加和删除定时器的效率偏低,当定时器很多的时候,每次添加都要从头开始遍历寻找应该插入的位置,所以我们用了一种新的定时器容器——时间轮就很好地解决了这个问题。
什么是时间轮,这个名字很形象,时间轮就像是一个轮子,我们虚拟一个指针指向轮子上的每一个槽,这个指针每隔一个时间间隔就走到下一个槽,这个时间间隔就是心搏时间,也是时间轮的槽时间si,而每一个槽其实就是一个指向一条定时器链表的头指针,每条链表上的定时器的定时时间相差N*si的整数倍,这个思想和哈希的思想很像。
1 /************************************************************************* 2 > File Name: time_wheel_timer.h 3 > Author: Torrance_ZHANG 4 > Mail: [email protected] 5 > Created Time: Mon 12 Feb 2018 05:13:13 AM PST 6 ************************************************************************/ 7 8 #ifndef _TIME_WHEEL_TIMER_H 9 #define _TIME_WHEEL_TIMER_H 10 11 #include<time.h> 12 #include<netinet/in.h> 13 #include<stdio.h> 14 15 #define BUFFER_SIZE 64 16 class tw_timer; 17 18 struct client_data { 19 sockaddr_in address; 20 int sockfd; 21 char buf[BUFFER_SIZE]; 22 tw_timer* timer; 23 }; 24 25 class tw_timer { 26 public: 27 tw_timer(int rot, int ts) : next(NULL), prev(NULL), rotation(rot), time_slot(ts) {} 28 public: 29 int rotation; 30 int time_slot; 31 void (*cb_func)(client_data*); 32 client_data* user_data; 33 tw_timer* next; 34 tw_timer* prev; 35 }; 36 37 class time_wheel { 38 public: 39 time_wheel() : cur_slot(0) { 40 for(int i = 0; i < N; ++ i) { 41 slots[i] = NULL; 42 } 43 } 44 ~time_wheel() { 45 for(int i = 0; i < N; i ++) { 46 tw_timer* tmp = slots[i]; 47 while(tmp) { 48 slots[i] = tmp -> next; 49 delete tmp; 50 tmp = slots[i]; 51 } 52 } 53 } 54 tw_timer* add_timer(int timeout) { 55 if(timeout < 0) return NULL; 56 int ticks = 0; 57 if(timeout < SI) { 58 ticks = 1; 59 } 60 else ticks = timeout / SI; 61 int rotation = ticks / N; 62 int ts = (cur_slot + (ticks % N)) % N; 63 tw_timer* timer = new tw_timer(rotation, ts); 64 if(!slots[ts]) { 65 printf("add timer, rotation is %d, ts is %d, cur_slot is %d\\n", rotation, ts, cur_slot); 66 slots[ts] = timer; 67 } 68 else { 69 timer -> next = slots[ts]; 70 slots[ts] -> prev = timer; 71 slots[ts] = timer; 72 } 73 return timer; 74 } 75 void del_timer(tw_timer* timer) { 76 if(!timer) return; 77 int ts = timer -> time_slot; 78 if(timer == slots[ts]) { 79 slots[ts] = slots[ts] -> next; 80 if(slots[ts]) { 81 slots[ts] -> prev = NULL; 82 } 83 delete timer; 84 } 85 else { 86 timer -> prev -> next = timer -> next; 87 if(timer -> next) { 88 timer -> next -> prev = timer -> prev; 89 } 90 delete timer; 91 } 92 } 93 void tick() { 94 tw_timer* tmp = slots[cur_slot]; 95 printf("current slot is %d\\n", cur_slot); 96 while(tmp) { 97 printf("tick the timer once\\n"); 98 if(tmp -> rotation > 0) { 99 tmp -> rotation --; 100 tmp = tmp -> next; 101 } 102 else { 103 tmp -> cb_func(tmp -> user_data); 104 if(tmp == slots[cur_slot]) { 105 printf("delete header in cur_slot\\n"); 106 slots[cur_slot] = tmp -> next; 107 delete tmp; 108 if(slots[cur_slot]) { 109 slots[cur_slot] -> prev = NULL; 110 } 111 tmp = slots[cur_slot]; 112 } 113 else { 114 tmp -> prev -> next = tmp -> next; 115 if(tmp -> next) { 116 tmp -> next -> prev = tmp -> prev; 117 } 118 tw_timer* tmp2 = tmp -> next; 119 delete tmp; 120 tmp = tmp2; 121 } 122 } 123 } 124 cur_slot = ++cur_slot % N; 125 } 126 private: 127 static const int N = 60; 128 static const int SI = 1; 129 tw_timer* slots[N]; 130 int cur_slot; 131 }; 132 133 #endif
我们来将11-3的程序修改一下,使用时间轮来进行定时:
1 /************************************************************************* 2 > File Name: 11-3.cpp 3 > Author: Torrance_ZHANG 4 > Mail: [email protected] 5 > Created Time: Sun 11 Feb 2018 04:23:50 AM PST 6 ************************************************************************/ 7 8 #include"head.h" 9 #include"time_wheel_timer.h" 10 using namespace std; 11 12 #define FD_LIMIT 65535 13 #define MAX_EVENT_NUMBER 1024 14 #define TIMESLOT 1 15 16 static int pipefd[2]; 17 static time_wheel t_wheel; 18 static int epollfd = 0; 19 20 int setnonblocking(int fd) { 21 int old_option = fcntl(fd, F_GETFL); 22 int new_option = old_option | O_NONBLOCK; 23 fcntl(fd, F_SETFL, new_option); 24 return old_option; 25 } 26 27 void addfd(int epollfd, int fd) { 28 epoll_event event; 29 event.data.fd = fd; 30 event.events = EPOLLIN | EPOLLET; 31 epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); 32 setnonblocking(fd); 33 } 34 35 void sig_handler(int sig) { 36 int save_errno = errno; 37 int msg = sig; 38 send(pipefd[1], (char*)&msg, 1, 0); 39 errno = save_errno; 40 } 41 42 void addsig(int sig) { 43 struct sigaction sa; 44 memset(&sa, 0, sizeof(sa)); 45 sa.sa_handler = sig_handler; 46 sa.sa_flags |= SA_RESTART; 47 sigfillset(&sa.sa_mask); 48 assert(sigaction(sig, &sa, NULL) != -1); 49 } 50 51 void timer_handler() { 52 t_wheel.tick(); 53 alarm(TIMESLOT); 54 } 55 56 void cb_func(client_data* user_data) { 57 epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data -> sockfd, 0); 58 assert(user_data); 59 close(user_data -> sockfd); 60 printf("close fd %d\\n", user_data -> sockfd); 61 } 62 63 int main(int argc, char** argv) { 64 if(argc <= 2) { 65 printf("usage: %s ip_address port_number\\n", basename(argv[0])); 66 return 1; 67 } 68 const char* ip = argv[1]; 69 int port = atoi(argv[2]); 70 71 int ret = 0; 72 struct sockaddr_in address; 73 bzero(&address, sizeof(address)); 74 address.sin_family = AF_INET; 75 inet_pton(AF_INET, ip, &address.sin_addr); 76 address.sin_port = htons(port); 77 78 int listenfd = socket(AF_INET, SOCK_STREAM, 0); 79 assert(listenfd >= 0); 80 81 ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); 82 if(ret == -1) { 83 printf("errno is %d\\n", errno); 84 return 1; 85 } 86 ret = listen(listenfd, 5); 87 assert(ret != -1); 88 89 epoll_event events[MAX_EVENT_NUMBER]; 90 int epollfd = epoll_create(5); 91 assert(epollfd != -1); 92 addfd(epollfd, listenfd); 93 94 ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd); 95 assert(ret != -1); 96 setnonblocking(pipefd[1]); 97 addfd(epollfd, pipefd[0]); 98 99 addsig(SIGALRM); 100 addsig(SIGTERM); 101 bool stop_server = false; 102 client_data* users = new client_data[FD_LIMIT]; 103 bool timeout = false; 104 alarm(TIMESLOT); 105 106 while(!stop_server) { 107 int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); 108 if((number < 0) && (errno != EINTR)) { 109 printf("epoll failure\\n"); 110 break; 111 } 112 for(int i = 0; i < number; i ++) { 113 int sockfd = events[i].data.fd; 114 if(sockfd == listenfd) { 115 struct sockaddr_in client_address; 116 socklen_t client_addrlength = sizeof(address); 117 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 118 addfd(epollfd, connfd); 119 users[connfd].address = client_address; 120 users[connfd].sockfd = connfd; 121 122 tw_timer* new_timer = t_wheel.add_timer(15 * TIMESLOT); 123 new_timer -> user_data = &users[connfd]; 124 new_timer -> cb_func = cb_func; 125 users[connfd].timer = new_timer; 126 } 127 else if((sockfd == pipefd[0]) && (events[i].events & EPOLLIN)) { 128 int sig; 129 char signals[1024]; 130 ret = recv(pipefd[0], signals, sizeof(signals), 0); 131 if(ret == -1) continue; 132 else if(ret == 0) continue; 133 else { 134 for(int i = 0; i < ret; i ++) { 135 switch(signals[i]) { 136 case SIGALRM: { 137 timeout = true; 138 break; 139 } 140 case SIGTERM: { 141 stop_server = true; 142 } 143 } 144 } 145 } 146 } 147 else if(events[i].events & EPOLLIN) { 148 memset(users[sockfd].buf, 0, BUFFER_SIZE); 149 ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE, 0); 150 printf("get %d bytes of client data %s from %d\\n", ret, users[sockfd].buf, sockfd); 151 152 tw_timer* timer = users[sockfd].timer; 153 if(ret < 0) { 154 if(errno != EAGAIN) { 155 cb_func(&users[sockfd]); 156 if(timer) { 157 t_wheel.del_timer(timer); 158 } 159 } 160 } 161 else if(ret == 0) { 162 cb_func(&users[sockfd]); 163 if(timer) { 164 t_wheel.del_timer(timer); 165 } 166 } 167 else { 168 if(timer) { 169 t_wheel.del_timer(timer); 170 tw_timer* new_timer = t_wheel.add_timer(15 * TIMESLOT); 171 new_timer -> user_data = &users[sockfd]; 172 new_timer -> cb_func = cb_func; 173 users[sockfd].timer = new_timer; 174 } 175 } 176 } 177 else {} 178 } 179 if(timeout) { 180 timer_handler(); 181 timeout = false; 182 } 183 } 184 close(listenfd); 185 close(pipefd[1]); 186 close(pipefd[0]); 187 delete [] users; 188 return 0; 189 }
时间轮每秒转动一格,当15s没有消息发送的时候就会自动断开连接。对于时间轮而言,添加和删除一个定时器的时间复杂度是O(1),执行一个定时器的时间复杂度表面上看是O(n),但是由于我们将所有定时器散列到不同的链表中,所以每条链表上的定时器其实很少,当时用多个时间轮的时候其复杂度可以降至接近O(1),所以其效率是很高的。
对于时间堆,有数据结构基础的话其实很好理解,就是一个基于时间排序的小根堆,每一次添加操作可以用O(lgn)的复杂度完成,而执行和删除堆顶的操作复杂度为O(1),只是删除后需要将堆进行调整,其复杂度也约为O(lgn)。整体来看时间堆的复杂度也是很好的,我们来看一下代码:
1 /************************************************************************* 2 > File Name: min_heap.h 3 > Author: Torrance_ZHANG 4 > Mail: [email protected] 5 > Created Time: Mon 12 Feb 2018 09:31:19 PM PST 6 ************************************************************************/ 7 8 #ifndef _MIN_HEAP_H 9 #define _MIN_HEAP_H 10 11 #include<iostream> 12 #include<netinet/in.h> 13 #include<time.h> 14 15 using std::exception; 16 17 #define BUFFER_SIZE 64 18 class heap_timer; 19 20 struct client_data { 21 sockaddr_in address; 22 int sockfd; 23 char buf[BUFFER_SIZE]; 24 heap_timer* timer; 25 }; 26 27 class heap_timer { 28 public: 29 heap_timer(int delay) { 30 expire = time(NULL) + delay; 31 } 32 public: 33 time_t expire; 34 void (*cb_func)(client_data*); 35 client_data* user_data; 36 }; 37 38 class time_heap { 39 public: 40 time_heap(int cap) throw (std::exception) : capacity(cap), cur_size(0) { 41 array = new heap_timer* [capacity]; 42 if(!array) throw std::exception(); 43 for(int i = 0; i < capacity; i ++) { 44 array[i] = NULL; 45 } 46 } 47 time_heap(heap_timer** init_array, int size, int capacity) throw (std::exception) : cur_size(size), capacity(capacity) { 48 if(capacity < size) throw std::exception(); 49 array = new heap_timer* [capacity]; 50 if(!array) throw std::exception(); 51 for(int i = 0; i < capacity; i ++) { 52 array[i] = NULL; 53 } 54 if(size != 0) { 55 for(int i = 0; i < size; i ++) { 56 array[i] = init_array[i]; 57 } 58 for(int i = (cur_size - 1) / 2; i >= 0; -- i) { 59 percolate_down(i); 60 } 61 } 62 } 63 ~time_heap() { 64 for(int i = 0; i < cur_size; i ++) { 65 delete array[i]; 66 } 67 delete [] array; 68 } 69 public: 70 void add_timer(heap_timer* timer) throw (std::exception) { 71 if(!timer) return; 72 if(cur_size >= capacity) resize(); 73 int hole = cur_size ++; 74 int parent = 0; 75 for(; hole > 0; hole = parent) { 76 parent = (hole - 1) / 2; 77 if(array[parent] -> expire <= timer -> expire) break; 78 array[hole] = array[parent]; 79 } 80 array[hole] = timer; 81 } 82 void del_timer(heap_timer* timer) { 83 if(!timer) return; 84 //延迟销毁,这样可以节省删除定时器的开销,但是会使堆数组膨胀,空间换时间 85 timer -> cb_func = NULL; 86 } 87 heap_timer* top() const { 88 if(empty()) return NULL; 89 return array[0]; 90 } 91 void pop_timer() { 92 if(empty()) return; 93 if(array[0]) { 94 delete array[0]; 95 array[0] = array[-- cur_size]; 96 percolate_down(0); 97 } 98 } 99 void tick() { 100 heap_timer* tmp = array[0]; 101 time_t cur = time(NULL); 102 while(!empty()) { 103 if(!tmp) break; 104 if(tmp -> expire > cur) break; 105 if(array[0] -> cb_func) array[0] -> cb_func(array[0] -> user_data); 106 pop_timer(); 107 tmp = array[0]; 108 } 109 } 110 bool empty() const { 111 return cur_size == 0; 112 } 113 private: 114 void percolate_down(int hole) { 115 heap_timer* temp = array[hole]; 116 int child = 0; 117 for(; ((hole * 2 + 1) <= (cur_size - 1)); hole = child) { 118 child = hole * 2 + 1; 119 if((child < (cur_size - 1)) && (array[child + 1] -> expire < array[child] -> expire)) ++child; 120 if(array[child] -> expire < temp -> expire) array[hole] = array[child]; 121 else break; 122 } 123 array[hole] = temp; 124 } 125 void resize() throw (std::exception) { 126 heap_timer** temp = new heap_timer* [2 * capacity]; 127 for(int i = 0; i < 2 * capacity; i ++) { 128 temp[i] = NULL; 129 } 130 if(!temp) { 131 throw std::exception(); 132 } 133 capacity = 2 * capacity; 134 for(int i = 0; i < cur_size; i ++) { 135 temp[i] = array[i]; 136 } 137 delete [] array; 138 array = temp; 139 } 140 private: 141 heap_timer** array; 142 int capacity; 143 int cur_size; 144 }; 145 146 #endif