13.1 多线程操作共享内存生产者消费者模型多线程服务器框架
Posted 天道酬勤
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了13.1 多线程操作共享内存生产者消费者模型多线程服务器框架相关的知识,希望对你有一定的参考价值。
生产者消费者模型如下:
程序如下:
1 #include <unistd.h> 2 #include <sys/types.h> 3 4 #include <stdlib.h> 5 #include <stdio.h> 6 #include <errno.h> 7 #include <string.h> 8 9 #include <pthread.h> 10 11 int g_Count = 0; 12 13 int nNum, nLoop; 14 15 //定义锁 16 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 17 18 //定义条件并初始化 19 pthread_cond_t my_condition=PTHREAD_COND_INITIALIZER; 20 21 #define CUSTOM_COUNT 2 22 #define PRODUCT_COUNT 4 23 24 25 // int pthread_mutex_lock(pthread_mutex_t *mutex); 26 // int pthread_mutex_trylock(pthread_mutex_t *mutex); 27 // int pthread_mutex_unlock(pthread_mutex_t *mutex); 28 29 /* 30 int pthread_cond_timedwait(pthread_cond_t *restrict cond, 31 pthread_mutex_t *restrict mutex, 32 const struct timespec *restrict abstime); 33 int pthread_cond_wait(pthread_cond_t *restrict cond, 34 pthread_mutex_t *restrict mutex); 35 */ 36 37 //posix 线程库的函数 线程库 38 void *consume(void* arg) 39 { 40 41 int inum = 0; 42 inum = (int)arg; 43 while(1) 44 { 45 pthread_mutex_lock(&mutex); 46 printf("consum:%d\\n", inum); 47 while (g_Count == 0) //while 醒来以后需要重新判断 条件g_Count是否满足,如果不满足,再次wait 48 { 49 printf("consum:%d 开始等待\\n", inum); 50 pthread_cond_wait(&my_condition, &mutex); //api做了三件事情 //pthread_cond_wait假醒 51 printf("consum:%d 醒来\\n", inum); 52 } 53 54 printf("consum:%d 消费产品begin\\n", inum); 55 g_Count--; //消费产品 56 printf("consum:%d 消费产品end\\n", inum); 57 58 pthread_mutex_unlock(&mutex); 59 60 sleep(1); 61 } 62 63 pthread_exit(0); 64 65 } 66 67 //生产者线程 68 // 69 void *produce(void* arg) 70 { 71 int inum = 0; 72 inum = (int)arg; 73 74 while(1) 75 { 76 77 /* 78 //因为是很多生产者调用produce,要保护全局变量g_Count,所以加锁 79 pthread_mutex_lock(&mutex); 80 if (g_Count > 20) 81 { 82 printf("produce:%d 产品太多,需要控制,休眠\\n", inum); 83 pthread_mutex_unlock(&mutex); 84 sleep(1); 85 continue; 86 } 87 else 88 { 89 pthread_mutex_unlock(&mutex); 90 } 91 */ 92 93 pthread_mutex_lock(&mutex); 94 printf("产品数量:%d\\n", g_Count); 95 printf("produce:%d 生产产品begin\\n", inum); 96 g_Count++; 97 //只要我生产出一个产品,就告诉消费者去消费 98 printf("produce:%d 生产产品end\\n", inum); 99 100 printf("produce:%d 发条件signal begin\\n", inum); 101 pthread_cond_signal(&my_condition); //通知,在条件上等待的线程 102 printf("produce:%d 发条件signal end\\n", inum); 103 104 pthread_mutex_unlock(&mutex); 105 sleep(1); 106 } 107 108 pthread_exit(0); 109 110 } 111 112 //结论:return arg 和 pthread_exit()的结果都可以让pthread_join 接过来 113 int main() 114 { 115 int i =0; 116 pthread_t tidArray[CUSTOM_COUNT+PRODUCT_COUNT+10]; 117 118 //创建消费者线程 119 for (i=0; i<CUSTOM_COUNT; i++) 120 { 121 pthread_create(&tidArray[i], NULL, consume, (void *)i); 122 } 123 124 sleep(1); 125 //创建生产线程 126 for (i=0; i<PRODUCT_COUNT; i++) 127 { 128 pthread_create(&tidArray[i+CUSTOM_COUNT], NULL, produce, (void*)i); 129 } 130 131 132 133 for (i=0; i<CUSTOM_COUNT+PRODUCT_COUNT; i++) 134 { 135 pthread_join(tidArray[i], NULL); //等待线程结束。。。 136 } 137 138 139 printf("进程也要结束1233\\n"); 140 141 return 0; 142 }
执行结果如下:
条件等待模型如下:
多线程访问共享内存,通过信号量同步:不同进程中的线程无法进行同步与互斥
1 #include <sys/types.h> 2 #include <unistd.h> 3 4 #include <stdlib.h> 5 #include <stdio.h> 6 #include <string.h> 7 8 #include <signal.h> 9 #include <errno.h> 10 #include <signal.h> 11 #include <sys/wait.h> 12 13 #include "myipc_sem.h" 14 #include "myipc_shm.h" 15 16 #include <pthread.h> 17 18 int g_key = 0x3333; 19 20 void TestFunc(int loopnum) 21 { 22 printf("loopnum:%d\\n", loopnum); 23 24 int ncount = 0; 25 int ret = 0; 26 int shmhdl = 0; 27 int *addr = NULL; 28 29 int semid = 0; 30 sem_open(g_key, &semid); 31 32 33 sem_p(semid); //临界区开始 34 // 35 ret = IPC_CreatShm(".", 0, &shmhdl); 36 37 ret =IPC_MapShm(shmhdl, (void **)&addr); 38 *((int *)addr) = *((int *)addr) + 1; 39 ncount = *((int *)addr); 40 printf("ncount:%d\\n", ncount); 41 //addr[0] = addr[0] +1; 42 ret =IPC_UnMapShm(addr); 43 //sleep(2); 44 45 sem_v(semid); //临界区开始 46 // 47 printf("进程正常退出:%d\\n", getpid()); 48 } 49 50 51 52 //posix 线程库的函数 线程库 53 void *thread_routine(void* arg) 54 { 55 printf("thread_routine start\\n"); 56 TestFunc(1); 57 pthread_exit(0); 58 59 } 60 61 int main(void ) 62 { 63 int res; 64 int procnum=10; 65 int loopnum = 100; 66 67 pthread_t tidArray[200]; 68 69 70 int i=0,j = 0; 71 72 printf("请输入要创建子进程的个数 : \\n"); 73 scanf("%d", &procnum); 74 75 printf("请输入让每个子进程测试多少次 :\\n"); 76 scanf("%d", &loopnum); 77 78 //共享内存创建 79 int ret = 0; 80 int shmhdl = 0; 81 ret = IPC_CreatShm(".", sizeof(int), &shmhdl); 82 if (ret != 0) 83 { 84 printf("func IPC_CreatShm() err:%d \\n", ret); 85 return ret; 86 } 87 88 89 //信号量的创建 90 int semid = 0; 91 ret = sem_creat(g_key, &semid); 92 if (ret != 0) 93 { 94 printf("func sem_creat() err:%d,重新按照open打开信号量 \\n", ret); 95 if (ret == SEMERR_EEXIST) 96 { 97 ret = sem_open(g_key, &semid); 98 if (ret != 0) 99 { 100 printf("按照打开的方式,重新获取sem失败:%d \\n", ret); 101 return ret; 102 } 103 } 104 else 105 { 106 return ret; 107 } 108 109 } 110 111 int val = 0; 112 ret = sem_getval(semid, &val); 113 if (ret != 0 ) 114 { 115 printf("func sem_getval() err:%d \\n", ret); 116 return ret; 117 } 118 printf("sem val:%d\\n", val); 119 getchar(); 120 121 for (i=0; i<procnum; i++) 122 { 123 124 //tmp.numId = i; //这说明 线程函数在使用了一个不断变化的内存空间。。。。 125 pthread_create(&tidArray[i], NULL, thread_routine, NULL); 126 } 127 128 for (i=0; i<procnum; i++) 129 { 130 pthread_join(tidArray[i], NULL); //等待线程结束。。。 131 } 132 133 134 printf("父进程退出 hello...\\n"); 135 return 0; 136 }
多线程访问共享内存,通过线程锁实现:
1 #include <sys/types.h> 2 #include <unistd.h> 3 4 #include <stdlib.h> 5 #include <stdio.h> 6 #include <string.h> 7 8 #include <signal.h> 9 #include <errno.h> 10 #include <signal.h> 11 #include <sys/wait.h> 12 13 #include "myipc_sem.h" 14 #include "myipc_shm.h" 15 16 #include <pthread.h> 17 18 pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER; 19 20 int g_key = 0x3333; 21 22 void TestFunc(int loopnum) 23 { 24 printf("loopnum:%d\\n", loopnum); 25 26 int ncount = 0; 27 int ret = 0; 28 int shmhdl = 0; 29 int *addr = NULL; 30 31 int semid = 0; 32 sem_open(g_key, &semid); 33 34 35 sem_p(semid); //临界区开始 36 // 37 ret = IPC_CreatShm(".", 0, &shmhdl); 38 39 ret =IPC_MapShm(shmhdl, (void **)&addr); 40 *((int *)addr) = *((int *)addr) + 1; 41 ncount = *((int *)addr); 42 printf("ncount:%d\\n", ncount); 43 //addr[0] = addr[0] +1; 44 ret =IPC_UnMapShm(addr); 45 //sleep(2); 46 47 sem_v(semid); //临界区开始 48 // 49 printf("进程正常退出:%d\\n", getpid()); 50 } 51 52 void TestFunc_threadMutex(int loopnum) 53 { 54 printf("loopnum:%d\\n", loopnum); 55 56 int ncount = 0; 57 int ret = 0; 58 int shmhdl = 0; 59 int *addr = NULL; 60 61 int semid = 0; 62 sem_open(g_key, &semid); 63 64 65 //sem_p(semid); //临界区开始 66 pthread_mutex_lock(&mymutex); 67 // 68 ret = IPC_CreatShm(".", 0, &shmhdl); 69 70 ret =IPC_MapShm(shmhdl, (void **)&addr); 71 *((int *)addr) = *((int *)addr) + 1; 72 ncount = *((int *)addr); 73 printf("ncount:%d\\n", ncount); 74 //addr[0] = addr[0] +1; 75 ret =IPC_UnMapShm(addr); 76 //sleep(2); 77 78 //sem_v(semid); //临界区开始 79 pthread_mutex_unlock(&mymutex); 80 // 81 printf("进程正常退出:%d\\n", getpid()); 82 } 83 84 85 86 87 //posix 线程库的函数 线程库 88 void *thread_routine(void* arg) 89 { 90 printf("thread_routine start\\n"); 91 //TestFunc(1); 92 TestFunc_threadMutex(1); 93 pthread_exit(0); 94 95 } 96 97 int main(void ) 98 { 99 int res; 100 int procnum=10; 101 int loopnum = 100; 102 103 pthread_t tidArray[1024*10]; 104 105 106 int i=0,j = 0; 107 108 printf("请输入要创建子进程的个数 : \\n"); 109 scanf("%d", &procnum); 110 111 printf("请输入让每个子进程测试多少次 :\\n"); 112 scanf("%d", &loopnum); 113 114 //共享内存创建 115 int ret = 0; 116 int shmhdl = 0; 117 ret = IPC_CreatShm(".", sizeof(int), &shmhdl); 118 if (ret != 0) 119 { 120 printf("func IPC_CreatShm() err:%d \\n", ret); 121 return ret; 122 } 123 124 125 //信号量的创建 126 int semid = 0; 127 ret = sem_creat(g_key, &semid); 128 if (ret != 0) 129 { 130 printf("func sem_creat() err:%d,重新按照open打开信号量 \\n", ret); 131 if (ret == SEMERR_EEXIST) 132 { 133 ret = sem_open(g_key, &semid); 134 if (ret != 0) 135 { 136 printf("按照打开的方式,重新获取sem失败:%d \\n", ret); 137 return ret; 138 } 139 } 140 else 141 { 142 return ret; 143 } 144 145 } 146 147 int val = 0; 148 ret = sem_getval(semid, &val); 149 if (ret != 0 ) 150 { 151 printf("func sem_getval() err:%d \\n", ret); 152 return ret; 153 } 154 printf("sem val:%d\\n", val); 155 Java总结——通过Callable接口实现多线程,生产者消费者问题,多线下载(复制)文件