在多进程编程中,我们用fork系统调用创建子进程,值得注意的是,fork函数复制当前进程并在内核进程表中创建一个新的表项,其堆、栈指针,标志寄存器的值都和父进程相同,但是其ppid被设置成父进程pid,信号位图被清除。而子进程代码和父进程完全相同,其数据也会复制自父进程,但是其复制过程是写时复制,即父子任意进程对数据执行写操作时才会复制,首先是缺页中断,然后操作系统给子进程分配空间并复制数据。此外,创建子进程后父进程中打开的文件描述符在子进程中也是默认打开的,其文件描述符引用计数加一,父进程的用户根目录,当前工作目录等变量的引用计数均会加一。
僵尸进程是多进程编程中需要了解和避免的情况,僵尸进程的产生是由于子进程先于父进程退出,而此时父进程没有正常接收子进程退出状态,导致子进程脱离父进程存在于操作系统中,还占据了原有的进程描述符和资源,造成了资源的浪费。避免僵尸进程的产生共有三种方法:第一种是处理SIGCHLD信号,当子进程退出时会向父进程发送SIGCHLD信号,我们可以指明信号处理函数来进行处理;第二种是拖管法,由父进程创建一个中间进程,再由这个中间进程创建子进程后直接退出,这样子进程变成了孤儿进程,会由init进程托管,由init进程负责回收其资源,但是这样就破坏了父子进程之间的血缘关系;第三种就是阻塞法,调用wait函数等待子进程退出,缺点就是会使父进程进入阻塞状态。对于wait函数,其进入阻塞状态显然是我们所不允许的,所以waitpid函数就解决了这个问题。
在多进程编程中,一个最为重要的需要解决的问题就是进程间通信IPC,我们常用的IPC方法有:管道、信号量、共享内存和消息队列。其系统调用较为简单,我们只来简单说明一下这四种方式的区别和异同。首先管道是我们在前文就提到过的,而且讲过了再网络编程中经常使用socketpair函数创建一个双向管道,管道分为无名管道和有名管道,无名管道只能用于有亲缘关系的进程之间进行通信,而且只能用低级文件编程库中的读写函数,而有名管道就是创建一个管道文件,通过文件进行信息交流,所以没有亲缘关系的限制。信号量是用来实现多进程之间的互斥与同步的,其本质是一个整形的数,我们用pv操作来对其进行操作,如果是二进制的信号量,则可以用信号量保证对于临界资源来讲同一时刻只有一段代码能对其进行访问。共享内存是在内存空间创建或获取一段空间来让各个进程进行共享,通过这段空间进行信息交流,而这种方式就比较灵活,可以自定其数据结构,完成各式各样的功能。消息队列就是一个能存放消息的队列,各个进程将消息发送到消息队列中,并指明要发送的对象,其他程序可以直接监听这个队列,收取发给自己的消息。几种进程间通信的方式都很普遍,我们来看一个用共享内存实现的聊天室服务器程序:
1 /************************************************************************* 2 > File Name: 13-4.cpp 3 > Author: Torrance_ZHANG 4 > Mail: [email protected] 5 > Created Time: Wed 14 Feb 2018 04:18:04 PM PST 6 ************************************************************************/ 7 8 #include"head.h" 9 using namespace std; 10 11 #define USER_LIMIT 5 12 #define BUFFER_SIZE 1024 13 #define FD_LIMIT 65535 14 #define MAX_EVENT_NUMBER 1024 15 #define PROCESS_LIMIT 65536 16 17 struct client_data { 18 sockaddr_in address; 19 int connfd; 20 pid_t pid; 21 int pipefd[2]; 22 }; 23 24 static const char* shm_name = "/my_shm"; 25 int sig_pipefd[2]; 26 int epollfd; 27 int listenfd; 28 int shmfd; 29 char* share_mem = 0; 30 client_data* users = 0; 31 int* sub_process = 0; 32 int user_count = 0; 33 bool stop_child = false; 34 35 int setnonblocking(int fd) { 36 int old_option = fcntl(fd, F_GETFL); 37 int new_option = old_option | O_NONBLOCK; 38 fcntl(fd, F_SETFL, new_option); 39 return old_option; 40 } 41 42 void addfd(int epollfd, int fd) { 43 epoll_event event; 44 event.data.fd = fd; 45 event.events = EPOLLIN | EPOLLET; 46 epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); 47 setnonblocking(fd); 48 } 49 50 void sig_handler(int sig) { 51 int save_errno = errno; 52 int msg = sig; 53 send(sig_pipefd[1], (char*)&msg, 1, 0); 54 errno = save_errno; 55 } 56 57 void addsig(int sig, void(*handler)(int), bool restart = true) { 58 struct sigaction sa; 59 memset(&sa, 0, sizeof(sa)); 60 sa.sa_handler = handler; 61 if(restart) sa.sa_flags |= SA_RESTART; 62 sigfillset(&sa.sa_mask); 63 assert(sigaction(sig, &sa, NULL) != -1); 64 } 65 66 void del_resource() { 67 close(sig_pipefd[0]); 68 close(sig_pipefd[1]); 69 close(listenfd); 70 close(epollfd); 71 shm_unlink(shm_name); 72 delete [] users; 73 delete [] sub_process; 74 } 75 76 void child_term_handler(int sig) { 77 stop_child = true; 78 } 79 80 int run_child(int idx, client_data* users, char* share_mem) { 81 epoll_event events[MAX_EVENT_NUMBER]; 82 int child_epollfd = epoll_create(5); 83 assert(child_epollfd != -1); 84 int connfd = users[idx].connfd; 85 addfd(child_epollfd, connfd); 86 int pipefd = users[idx].pipefd[1]; 87 addfd(child_epollfd, pipefd); 88 int ret; 89 addsig(SIGTERM, child_term_handler, false); 90 91 while(!stop_child) { 92 int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1); 93 if((number < 0) && (errno != EINTR)) { 94 printf("epoll failure\\n"); 95 break; 96 } 97 for(int i = 0; i < number; i ++) { 98 int sockfd = events[i].data.fd; 99 if((sockfd == connfd) && (events[i].events & EPOLLIN)) { 100 memset(share_mem + idx * BUFFER_SIZE, 0, BUFFER_SIZE); 101 ret = recv(connfd, share_mem + idx * BUFFER_SIZE, BUFFER_SIZE - 1, 0); 102 if(ret < 0) { 103 if(errno != EAGAIN) { 104 stop_child = true; 105 } 106 } 107 else if(ret == 0) { 108 stop_child = true; 109 } 110 else { 111 send(pipefd, (char*)&idx, sizeof(idx), 0); 112 } 113 } 114 else if((sockfd == pipefd) && (events[i].events & EPOLLIN)) { 115 int client = 0; 116 ret = recv(sockfd, (char*)&client, sizeof(client), 0); 117 if(ret < 0) { 118 if(errno != EAGAIN) { 119 stop_child = true; 120 } 121 } 122 else if(ret == 0) { 123 stop_child = true; 124 } 125 else { 126 send(connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0); 127 } 128 } 129 else continue; 130 } 131 } 132 close(connfd); 133 close(pipefd); 134 close(child_epollfd); 135 return 0; 136 } 137 138 int main(int argc, char** argv) { 139 if(argc <= 2) { 140 printf("usage: %s ip_address port_number\\n", basename(argv[0])); 141 return 1; 142 } 143 const char* ip = argv[1]; 144 int port = atoi(argv[2]); 145 146 int ret = 0; 147 struct sockaddr_in address; 148 bzero(&address, sizeof(address)); 149 address.sin_family = AF_INET; 150 inet_pton(AF_INET, ip, &address.sin_addr); 151 address.sin_port = htons(port); 152 153 listenfd = socket(AF_INET, SOCK_STREAM, 0); 154 assert(listenfd >= 0); 155 156 ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); 157 assert(ret != -1); 158 159 ret = listen(listenfd, 5); 160 assert(ret != -1); 161 162 user_count = 0; 163 users = new client_data[USER_LIMIT + 1]; 164 sub_process = new int[PROCESS_LIMIT]; 165 for(int i = 0; i < PROCESS_LIMIT; i ++) { 166 sub_process[i] = -1; 167 } 168 epoll_event events[MAX_EVENT_NUMBER]; 169 epollfd = epoll_create(5); 170 assert(epollfd != -1); 171 addfd(epollfd, listenfd); 172 173 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sig_pipefd); 174 assert(ret != -1); 175 setnonblocking(sig_pipefd[1]); 176 addfd(epollfd, sig_pipefd[0]); 177 178 addsig(SIGCHLD, sig_handler); 179 addsig(SIGTERM, sig_handler); 180 addsig(SIGINT, sig_handler); 181 addsig(SIGPIPE, SIG_IGN); 182 bool stop_server = false; 183 bool terminate = false; 184 185 shmfd = shm_open(shm_name, O_CREAT | O_RDWR, 0666); 186 assert(shmfd != -1); 187 ret = ftruncate(shmfd, USER_LIMIT * BUFFER_SIZE); 188 assert(ret != -1); 189 190 share_mem = (char*)mmap(NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0); 191 assert(share_mem != MAP_FAILED); 192 close(shmfd); 193 194 while(!stop_server) { 195 int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); 196 if((number < 0) && (errno != EINTR)) { 197 printf("epoll failure\\n"); 198 break; 199 } 200 for(int i = 0; i < number; i ++) { 201 int sockfd = events[i].data.fd; 202 if(sockfd == listenfd) { 203 struct sockaddr_in client_address; 204 socklen_t client_addrlength = sizeof(client_address); 205 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 206 if(connfd < 0) { 207 printf("errno is: %d\\n", errno); 208 continue; 209 } 210 if(user_count >= USER_LIMIT) { 211 const char* info = "too many users\\n"; 212 printf("%s", info); 213 send(connfd, info, strlen(info), 0); 214 close(connfd); 215 continue; 216 } 217 users[user_count].address = client_address; 218 users[user_count].connfd = connfd; 219 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd); 220 assert(ret != -1); 221 pid_t pid = fork(); 222 if(pid < 0) { 223 close(connfd); 224 continue; 225 } 226 else if(pid == 0) { 227 close(epollfd); 228 close(listenfd); 229 close(users[user_count].pipefd[0]); 230 close(sig_pipefd[0]); 231 close(sig_pipefd[1]); 232 run_child(user_count, users, share_mem); 233 munmap((void*)share_mem, USER_LIMIT* BUFFER_SIZE); 234 exit(0); 235 } 236 else { 237 close(connfd); 238 close(users[user_count].pipefd[1]); 239 addfd(epollfd, users[user_count].pipefd[0]); 240 users[user_count].pid = pid; 241 sub_process[pid] = user_count; 242 user_count ++; 243 } 244 } 245 else if((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN)) { 246 int sig; 247 char signals[1024]; 248 ret = recv(sig_pipefd[0], signals, sizeof(signals), 0); 249 if(ret == -1) continue; 250 else if(ret == 0) continue; 251 else { 252 for(int i = 0; i < ret; i ++) { 253 switch(signals[i]) { 254 case SIGCHLD: { 255 pid_t pid; 256 int stat; 257 while((pid = waitpid(-1, &stat, WNOHANG)) > 0) { 258 int del_user = sub_process[pid]; 259 sub_process[pid] = -1; 260 if((del_user < 0) || (del_user > USER_LIMIT)) { 261 continue; 262 } 263 epoll_ctl(epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0); 264 close(users[del_user].pipefd[0]); 265 users[del_user] = users[-- user_count]; 266 sub_process[users[del_user].pid] = del_user; 267 } 268 if(terminate && user_count == 0) { 269 stop_server = true; 270 } 271 break; 272 } 273 case SIGTERM: 274 case SIGINT: { 275 printf("kill all the child now\\n"); 276 if(user_count == 0) { 277 stop_server = true; 278 break; 279 } 280 for(int i = 0; i < user_count; i ++) { 281 int pid = users[i].pid; 282 kill(pid, SIGTERM); 283 } 284 terminate = true; 285 break; 286 } 287 default : break; 288 } 289 } 290 } 291 } 292 else if(events[i].events & EPOLLIN) { 293 int child = 0; 294 ret = recv(sockfd, (char*)&child, sizeof(child), 0); 295 printf("read data from child accross pipe\\n"); 296 if(ret == -1) continue; 297 else if(ret == 0) continue; 298 else { 299 for(int j = 0; j < user_count; j ++) { 300 if(users[j].pipefd[0] != sockfd) { 301 printf("send data to child accross pipe\\n"); 302 send(users[j].pipefd[0], (char*)&child, sizeof(child), 0); 303 } 304 } 305 } 306 } 307 } 308 } 309 del_resource(); 310 return 0; 311 }
服务器接收到客户端数据时,通过管道告知专门为其他服务器服务的子进程向对应客户端发送数据。
我们知道,当父进程创建子进程之后,父进程中打开的文件描述符仍然保持打开,所以文件描述符可以很方便地从父进程传递到子进程。需要特别注意的是进程描述符的传递并不是单纯地传送一个值,而是要在接收进程中创建一个新的文件描述符,并且该文件描述符和发送进程中被传递的文件描述符指向内核中的同一个文件表项。那么问题就来了,我们如何从子进程将文件描述符传送给父进程呢?推广来说,如何在不相关的两个进程之间传送文件描述符呢?这时我们就需要利用UNIX域socket在进程间传递特殊的辅助数据,我们来看一个例子:
1 /************************************************************************* 2 > File Name: 13-5.cpp 3 > Author: Torrance_ZHANG 4 > Mail: [email protected] 5 > Created Time: Tue 27 Feb 2018 03:35:13 AM PST 6 ************************************************************************/ 7 8 #include"head.h" 9 using namespace std; 10 11 static const int CONTROL_LEN = CMSG_LEN(sizeof(int)); 12 13 void send_fd(int fd, int fd_to_send) { 14 struct iovec iov[1]; 15 struct msghdr msg; 16 char buf[0]; 17 18 iov[0].iov_base = buf; 19 iov[0].iov_len = 1; 20 msg.msg_name = NULL; 21 msg.msg_namelen = 0; 22 msg.msg_iov = iov; 23 msg.msg_iovlen = 1; 24 25 cmsghdr cm; 26 cm.cmsg_len = CONTROL_LEN; 27 cm.cmsg_level = SOL_SOCKET; 28 cm.cmsg_type = SCM_RIGHTS; 29 *(int *)CMSG_DATA(&cm) = fd_to_send; 30 msg.msg_control = &cm; 31 msg.msg_controllen = CONTROL_LEN; 32 33 sendmsg(fd, &msg, 0); 34 } 35 36 int recv_fd(int fd) { 37 struct iovec iov[1]; 38 struct msghdr msg; 39 char buf[0]; 40 41 iov[0].iov_base = buf; 42 iov[0].iov_len = 1; 43 msg.msg_name = NULL; 44 msg.msg_namelen = 0; 45 msg.msg_iov = iov; 46 msg.msg_iovlen = 1; 47 48 cmsghdr cm; 49 msg.msg_control = &cm; 50 msg.msg_controllen = CONTROL_LEN; 51 recvmsg(fd, &msg, 0); 52 53 int fd_to_read = *(int*)CMSG_DATA(&cm); 54 return fd_to_read; 55 } 56 57 int main() { 58 int pipefd[2]; 59 int fd_to_pass = 0; 60 int ret = socketpair(AF_UNIX, SOCK_DGRAM, 0, pipefd); 61 assert(ret != -1); 62 63 pid_t pid = fork(); 64 assert(pid >= 0); 65 66 if(pid == 0) { 67 close(pipefd[0]); 68 fd_to_pass = open("test.txt", O_RDWR, 0666); 69 send_fd(pipefd[1], (fd_to_pass > 0) ? fd_to_pass : 0); 70 close(fd_to_pass); 71 exit(0); 72 } 73 74 close(pipefd[1]); 75 fd_to_pass = recv_fd(pipefd[0]); 76 char buf[1024]; 77 memset(buf, 0, sizeof(buf)); 78 read(fd_to_pass, buf, 1024); 79 printf("I got fd %d and data %s\\n", fd_to_pass, buf); 80 close(fd_to_pass); 81 }