Linux下的基于Pthread的多线程Socket编程

Posted Jack_Wang822

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux下的基于Pthread的多线程Socket编程相关的知识,希望对你有一定的参考价值。

程序设计

服务端的程序的设计主要是一个主线程首先调用Socket相关的函数socket,bind, listen在建立服务端的Socket之后,等待Accept上面,如果有新的客户端连接上来,则对于每一个客户端新建一个线程。在每一个客户端的线程中,其接收客户端发送的指令然后返回相关的信息,主线程Socket中默认的Accpet默认是阻塞的,这里采用了Linux中函数fctnl将其设置成非阻塞。除了客户端对应的线程和主线程之外,服务端还存在一个服务端的控制线程,其主要是接受服务端的命令,诸如退出,断开连接等等,对服务端的程序进行控制。

客户端的程序为一个阻塞的循环,接受用户的输入然后解析命令进行相关的操作,诸如连接服务端,发送数据等等操作

 

服务端实现细节

服务端的程序主要包括主线程接受客户端请求响应建立客户端对应线程,客户端命令执行,服务端主要控制模块。

主线程

首先建立服务端Socket连接,其过程如下:

  • 首先调用socket函数建立一个socket,其中传入的参数AF_INET参数表示其采用TCP协议,并且调用Linux系统提供的fcntl函数设置这个socket为非阻塞。
  • 然后初始化服务的地址,保护监听的IP(这里设置的监听服务器端所有的网卡),设置相应的端口以及协议类型(TCP),调用bind函数将其和前面的建立的socket绑定。
  • 设置该socket开始监听,这主要对应TCP连接上面的第一次握手,将TCP第一次握手成功的放在队列(SYN队列)中,其中传入的参数BACKLOG为Socket中队列的长度。
  • 然后调用Accept函数进行接收客户端的请求,其主要是从TCP上次握手成功的Accept队列中将客户端信息去除进行处理(此部分代码后面解释)。

最后服务端的主要是在一个循环中,其中最前面的serverExit主要是为了交互需要,后面会详细介绍。服务端调用Accept函数进行接收一个客户端的信息,如果存在连接上的客户端,则为其分配一个线程位置,然后调用pthread_create创建一个线程,并且将该客户端的信息参数传入线程。这里采用一个结构体的方式记录了客户端的相关信息,包括客户端IP,端口号,编号,是否连接等等。

这里为了防止服务器的无限的创建线程,设置了最大同时在线的客户端数目(MAXCONN),如果新连接上的客户端发现已经操作上线,那么该主线程就会睡眠,其等待在了一个客户端断开连接的信号量上面,如果有客户端断开连接,则主线程被唤醒,接受该客户端的连接。

需要注意的是这里涉及到很多的多线程的操作,很有可能发生数据冲突,需要很好使用pthread中的互斥锁防止发生数据冲突。

客户端服务线程

其主要是阻塞的接受客户端发送上来的命令,然后解析命令,对应每个命令进行不同的操作,将结果通过send函数发送给对应的客户端。这里主要实现了disconn断开连接,name返回客户端,list返回所有连接的客户端的信息(IP,端口,编号等等),send向别的客户端发送数据。
对于name和time命令只要将数据以字符串的形式发送给对应的客户端即可,而对于disconn命令,需要发送一个断开的信号,使得等待在这个信号量上的客户端能够连接上服务端,对于send命令需要根据发送的地址获取到目的的客户端信息,相对应的客户端发送相关的消息。服务端的程序另一个需要的是需要进行错误处理,诸如客户端的命令错误,格式错误,网络错误等等异常情况,其需要进行相应的报错,并且提供给客户端或者服务端相应的错误信息。
这里主要注意的是发送的数据需要指定数据的长度,而采用C的strlen指定的并不包括字符串最后的‘\0’,因此其不会发送出去。这里有两种解决方案,一种是接收方在数据的最后加上’\0’或者发送的时候将’\0’一同发送出去。 

服务端控制线程

服务端控制程序主要提供了一些命令给服务端可以对于服务端连接的客户端以及自身退出。这里主要实现了exit退出服务端,list列出所有连接的客户端信息,kill关闭某个客户端命令。其中exit命令为退出客户端,其主要是将一个退出退出标志位设置成1,然后主线程判断该标志位为1的时候会关闭所有的客户端,并且取消所有的客户端线程以及服务端程序线程,然后自身退出。对于list命令其和服务端发送的list指令实现相同,kill命令主要是关闭对于对应的客户端的socket,然后取消其对应的线程。

 

  1 #include <sys/types.h>
  2 #include <sys/socket.h>
  3 #include <netinet/in.h>
  4 #include <arpa/inet.h>
  5 #include <sys/time.h>
  6 #include <stdio.h>
  7 #include <errno.h>
  8 #include <string.h>
  9 #include <pthread.h>
 10 #include <stdlib.h>
 11 #include <fcntl.h>
 12 #include <unistd.h>
 13 
 14 #define PORT  8888
 15 #define BACKLOG 10
 16 #define MAXCONN 100
 17 #define BUFFSIZE 1024
 18 
 19 typedef unsigned char BYTE;
 20 typedef struct ClientInfo
 21 {
 22     struct sockaddr_in addr;
 23     int clientfd;
 24     int isConn;
 25     int index;
 26 } ClientInfo;
 27 
 28 pthread_mutex_t activeConnMutex;
 29 pthread_mutex_t clientsMutex[MAXCONN];
 30 pthread_cond_t connDis;
 31 
 32 pthread_t threadID[MAXCONN];
 33 pthread_t serverManagerID;
 34 
 35 ClientInfo clients[MAXCONN];
 36 
 37 int serverExit = 0;
 38 
 39 /*@brief Transform the all upper case 
 40 *
 41 */
 42 void tolowerString(char *s)
 43 {
 44     int i=0;
 45     while(i < strlen(s))
 46     {
 47         s[i] = tolower(s[i]);
 48         ++i;
 49     } 
 50 }
 51 
 52 void listAll(char *all)
 53 {
 54     int i=0, len = 0;
 55     len += sprintf(all+len, "Index   \t\tIP Address   \t\tPort\n");
 56     for(;i<MAXCONN;++i)
 57     {
 58         pthread_mutex_lock(&clientsMutex[i]);
 59         if(clients[i].isConn)
 60             len += sprintf(all+len, "%.8d\t\t%s\t\t%d\n",clients[i].index, inet_ntoa(clients[i].addr.sin_addr), clients[i].addr.sin_port);
 61         pthread_mutex_unlock(&clientsMutex[i]);
 62     }
 63 }
 64 
 65 void clientManager(void* argv)
 66 {
 67     ClientInfo *client = (ClientInfo *)(argv);
 68     
 69     BYTE buff[BUFFSIZE];
 70     int recvbytes;
 71     
 72     int i=0;
 73     int clientfd = client->clientfd;
 74     struct sockaddr_in addr = client->addr;
 75     int isConn = client->isConn;
 76     int clientIndex = client->index;
 77     
 78     while((recvbytes = recv(clientfd, buff, BUFFSIZE, 0)) != -1)
 79     {
 80      //   buff[recvbytes] = ‘\0‘;
 81         tolowerString(buff);    //case-insensitive
 82         
 83         char cmd[100];
 84         if((sscanf(buff, "%s", cmd)) == -1)    //command error
 85         { 
 86            char err[100];         
 87            if(send(clientfd, err, strlen(err)+1, 0) == -1)
 88            {
 89                strcpy(err, "Error command and please enter again!\n");
 90                fprintf(stdout, "%d sends an eroor command\n", clientfd);
 91                break;
 92            }
 93         }
 94         else    
 95         {
 96             char msg[BUFFSIZE]; //The message content
 97             int dest = clientIndex; //message destination
 98             int isMsg = 0;              //any message needed to send
 99             if(strcmp(cmd, "disconn") == 0)
100             {   
101                 pthread_cond_signal(&connDis);  //send a disconnetion signal and the waiting client can get response  
102                 break;
103             }
104             else if(strcmp(cmd, "time") == 0)
105             {
106                 time_t now;
107                 struct tm *timenow;
108                 time(&now);
109                 timenow = localtime(&now);
110                 strcpy(msg, asctime(timenow));
111                 isMsg = 1;
112             }
113             else if(strcmp(cmd, "name") == 0)
114             {
115                 strcpy(msg, "MACHINE NAME");
116                 isMsg = 1;
117             }
118             else if(strcmp(cmd, "list") == 0)
119             {                            
120                 listAll(msg);
121                 isMsg = 1;
122             }
123             else if(strcmp(cmd, "send") == 0)
124             {   
125                     
126                 if(sscanf(buff+strlen(cmd)+1, "%d%s", &dest, msg)==-1 || dest >= MAXCONN)
127                 {
128                     char err[100];
129                     strcpy(err, "Destination ID error and please use list to check and enter again!\n");
130                     fprintf(stderr, "Close %d client eroor: %s(errno: %d)\n", clientfd, strerror(errno), errno);
131                     break;
132                 }
133                 fprintf(stdout, "%d %s\n", dest, msg);   
134                 isMsg = 1;
135             }
136             else
137             {
138                 char err[100];
139                 strcpy(err, "Unknown command and please enter again!\n");
140                 fprintf(stderr, "Send to %d message eroor: %s(errno: %d)\n", clientfd, strerror(errno), errno);
141                 break;
142             }
143             
144             
145             if(isMsg)
146             {
147                 pthread_mutex_lock(&clientsMutex[dest]);
148                 if(clients[dest].isConn == 0)
149                 {
150                     sprintf(msg, "The destination is disconneted!");
151                     dest = clientIndex; 
152                 } 
153                             
154                 if(send(clients[dest].clientfd, msg, strlen(msg)+1, 0) == -1)
155                 {
156                     fprintf(stderr, "Send to %d message eroor: %s(errno: %d)\n", clientfd, strerror(errno), errno);
157                     pthread_mutex_unlock(&clientsMutex[dest]); 
158                     break;
159                 }
160                 printf("send successfully!\n");
161                 pthread_mutex_unlock(&clientsMutex[dest]); 
162             }
163         }  //end else       
164     }   //end while
165     
166     pthread_mutex_lock(&clientsMutex[clientIndex]);
167     client->isConn = 0;
168     pthread_mutex_unlock(&clientsMutex[clientIndex]);
169     
170     if(close(clientfd) == -1)
171         fprintf(stderr, "Close %d client eroor: %s(errno: %d)\n", clientfd, strerror(errno), errno);
172     fprintf(stderr, "Client %d connetion is closed\n", clientfd);
173     
174     pthread_exit(NULL);
175 }
176 
177 void serverManager(void* argv)
178 {
179     while(1)
180     {
181         char cmd[100];
182         scanf("%s", cmd);
183         tolowerString(cmd);
184         if(strcmp(cmd, "exit") == 0)
185             serverExit = 1;
186         else if(strcmp(cmd, "list") == 0)
187         {
188             char buff[BUFFSIZE];
189             listAll(buff);
190             fprintf(stdout, "%s", buff);
191         }
192         else if(strcmp(cmd, "kill") == 0)
193         {
194             int clientIndex;
195             scanf("%d", &clientIndex);
196             if(clientIndex >= MAXCONN)
197             {
198                 fprintf(stderr, "Unkown client!\n");
199                 continue;
200             }
201             pthread_mutex_lock(&clientsMutex[clientIndex]);
202             if(clients[clientIndex].isConn)
203             {
204                 if(close(clients[clientIndex].clientfd) == -1)
205                     fprintf(stderr, "Close %d client eroor: %s(errno: %d)\n", clients[clientIndex].clientfd, strerror(errno), errno);
206             }
207             else
208             {
209                 fprintf(stderr, "Unknown client!\n");
210             }
211             pthread_mutex_unlock(&clientsMutex[clientIndex]);
212             pthread_cancel(threadID[clientIndex]);
213                 
214         }
215         else
216         {
217             fprintf(stderr, "Unknown command!\n");
218         }
219     }
220 }
221 
222 int main()
223 {
224    int activeConn = 0;
225    
226    //initialize the mutex 
227    pthread_mutex_init(&activeConnMutex, NULL);   
228    pthread_cond_init(&connDis, NULL);
229    int i=0;
230    for(;i<MAXCONN;++i)
231        pthread_mutex_init(&clientsMutex[i], NULL); 
232    
233    for(i=0;i<MAXCONN;++i)
234        clients[i].isConn = 0; 
235        
236    //create the server manager thread
237    pthread_create(&serverManagerID, NULL, (void *)(serverManager), NULL);
238    
239    
240    int listenfd;
241    struct sockaddr_in  servaddr;
242     
243    //create a socket
244    if((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
245    {
246        fprintf(stderr, "Create socket error: %s(errno: %d)\n", strerror(errno), errno);
247        exit(0);
248    }
249    else
250        fprintf(stdout, "Create a socket successfully\n");
251    
252    fcntl(listenfd, F_SETFL, O_NONBLOCK);       //set the socket non-block
253    
254    //set the server address
255    memset(&servaddr, 0, sizeof(servaddr));  //initialize the server address 
256    servaddr.sin_family = AF_INET;           //AF_INET means using TCP protocol
257    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);    //any in address(there may more than one network card in the server)
258    servaddr.sin_port = htons(PORT);            //set the port
259    
260    //bind the server address with the socket
261    if(bind(listenfd, (struct sockaddr*)(&servaddr), sizeof(servaddr)) == -1)
262    {
263         fprintf(stderr, "Bind socket error: %s(errno: %d)\n", strerror(errno), errno);
264         exit(0);
265    }
266    else
267        fprintf(stdout, "Bind socket successfully\n");
268    
269    //listen
270    if(listen(listenfd, BACKLOG) == -1)
271    {
272        fprintf(stderr, "Listen socket error: %s(errno: %d)\n", strerror(errno), errno);
273        exit(0);
274    }
275    else
276        fprintf(stdout, "Listen socket successfully\n");
277     
278    
279    while(1)
280    {
281        if(serverExit)
282        {
283            for(i=0;i<MAXCONN;++i)
284            {
285                if(clients[i].isConn)
286                {
287                    if(close(clients[i].clientfd) == -1)         //close the client 
288                        fprintf(stderr, "Close %d client eroor: %s(errno: %d)\n", clients[i].clientfd, strerror(errno), errno);
289                    if(pthread_cancel(threadID[i]) != 0)         //cancel the corresponding client thread
290                         fprintf(stderr, "Cancel %d thread eroor: %s(errno: %d)\n", (int)(threadID[i]), strerror(errno), errno);
291                }
292            }
293            return 0;    //main exit;
294        }
295        
296        pthread_mutex_lock(&activeConnMutex);
297        if(activeConn >= MAXCONN)
298             pthread_cond_wait(&connDis, &activeConnMutex);
299        pthread_mutex_unlock(&activeConnMutex);
300            
301        //find an empty postion for a new connnetion
302        int i=0;
303        while(i<MAXCONN)
304        {
305            pthread_mutex_lock(&clientsMutex[i]);
306            if(!clients[i].isConn)
307            {
308                pthread_mutex_unlock(&clientsMutex[i]);
309                break;
310            }
311            pthread_mutex_unlock(&clientsMutex[i]);
312            ++i;           
313        }   
314        
315        //accept
316        struct sockaddr_in addr;
317        int clientfd;
318        int sin_size = sizeof(struct sockaddr_in);
319        if((clientfd = accept(listenfd, (struct sockaddr*)(&addr), &sin_size)) == -1)
320        {   
321            sleep(1);        
322            //fprintf(stderr, "Accept socket error: %s(errno: %d)\n", strerror(errno), errno);
323            continue;
324            //exit(0);
325        }
326        else
327            fprintf(stdout, "Accept socket successfully\n");
328        
329        pthread_mutex_lock(&clientsMutex[i]);
330        clients[i].clientfd = clientfd;
331        clients[i].addr = addr;
332        clients[i].isConn = 1;
333        clients[i].index = i;
334        pthread_mutex_unlock(&clientsMutex[i]);
335        
336        //create a thread for a client
337        pthread_create(&threadID[i], NULL, (void *)clientManager, &clients[i]);     
338        
339    }    //end-while
340 }

 

客户端实现细节

客户端程序主要可以分成客户端主线程控制和服务端消息接受程序。

主线程

客户端的程序较服务端的程序则简单的多,主要是接受用户的指令按照指令执行相关的任务。这里主要实现了conn连接服务器,disconn断开服务器连接,list查询服务器上的所有连接,name查看服务器名字,time查看时间,send先其他的客户端发送消息这些命令。

客户端首先必须通过conn命令连接上对应的服务器,然后可以执行其他的指令。用户通过time,name查看时间和机器名,如果需要发送消息,可以通过list查看所有连接,然后向指定的连接发送消息,quit退出客户端。

服务器消息处理线程

由于主线程为一个阻塞的线程,因此这里创建了一个接受服务器端的消息的线程。当用户通过conn命令连接上服务器之后,主线程就会创建该接受线程,其不断的监听服务器发回的消息,然后将其输入的屏幕上。当用户通过disconn命令取消连接的时候,主线程则就会取消掉这个线程。下面是该线程相关的代码。

  1 #include <sys/types.h>
  2 #include <sys/socket.h>
  3 #include <stdio.h>
  4 #include <netinet/in.h>
  5 #include <arpa/inet.h>
  6 #include <errno.h>
  7 #include <unistd.h>
  8 #include <string.h>
  9 #include <stdlib.h>
 10 #include <pthread.h>
 11 
 12 #define BUFFERSIZE 1024
 13 typedef unsigned char BYTE;
 14 pthread_t receiveID;
 15 
 16 void tolowerString(char *s)
 17 {
 18     int i=0;
 19     while(i < strlen(s))
 20     {
 21         s[i] = tolower(s[i]);
 22         ++i;
 23     } 
 24 }
 25 
 26 void receive(void *argv)
 27 {
 28     int sockclient = *(int*)(argv);
 29     BYTE recvbuff[BUFFERSIZE];
 30     while(recv(sockclient, recvbuff, sizeof(recvbuff), 0)!=-1) //receive
 31     {
 32         fputs(recvbuff, stdout);
 33         fputs("\n", stdout);      
 34     }
 35     fprintf(stderr, "Receive eroor: %s(errno: %d)\n", strerror(errno), errno);
 36 }
 37 
 38 int main()
 39 {
 40     ///define sockfd
 41     int sockclient = socket(AF_INET,SOCK_STREAM, 0);
 42 
 43     ///definet sockaddr_in
 44     struct sockaddr_in servaddr;
 45     memset(&servaddr, 0, sizeof(servaddr));
 46     
 47     int isConn = 0;
 48     
 49     BYTE buff[BUFFERSIZE];
 50 
 51     
 52     while (fgets(buff, sizeof(buff), stdin) != NULL)
 53     {
 54         tolowerString(buff);
 55         char cmd[100], ip[100];
 56         int port;
 57         if(sscanf(buff, "%s", cmd) == -1)    //command error
 58         { 
 59             fprintf(stderr, "Input eroor: %s(errno: %d) And please input again\n", strerror(errno), errno);
 60             continue;
 61         }
 62         if(strcmp(cmd, "conn") == 0)        //connecton command
 63         {
 64             char ip[100];
 65             int port, ipLen=0;
 66             if(sscanf(buff+strlen(cmd)+1, "%s", ip) == -1)    //command error
 67             { 
 68                 fprintf(stderr, "Input eroor: %s(errno: %d) And please input again\n", strerror(errno), errno);
 69                 continue;
 70             }
 71             if((sscanf(buff+strlen(cmd)+strlen(ip)+2, "%d", &port)) == -1)    //command error
 72             { 
 73                 fprintf(stderr, "Input eroor: %s(errno: %d) And please input again\n", strerror(errno), errno);
 74                 

以上是关于Linux下的基于Pthread的多线程Socket编程的主要内容,如果未能解决你的问题,请参考以下文章

pthread

Linux 多线程编程

Linux多线程编程实例解析

Linux多线程编程实例解析

Linux多线程编程实例解析

Linux多线程编程实例解析