13.1 多线程操作共享内存生产者消费者模型多线程服务器框架

Posted 天道酬勤

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了13.1 多线程操作共享内存生产者消费者模型多线程服务器框架相关的知识,希望对你有一定的参考价值。

  生产者消费者模型如下:

 

程序如下:

  1 #include <unistd.h>
  2 #include <sys/types.h>
  3 
  4 #include <stdlib.h>
  5 #include <stdio.h>
  6 #include <errno.h>
  7 #include <string.h>
  8 
  9 #include <pthread.h>
 10 
 11 int g_Count = 0;
 12 
 13 int     nNum, nLoop;
 14 
 15 //定义锁
 16 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
 17 
 18 //定义条件并初始化
 19 pthread_cond_t my_condition=PTHREAD_COND_INITIALIZER;
 20 
 21 #define CUSTOM_COUNT 2
 22 #define PRODUCT_COUNT 4
 23 
 24 
 25   // int pthread_mutex_lock(pthread_mutex_t *mutex);
 26   //     int pthread_mutex_trylock(pthread_mutex_t *mutex);
 27    //    int pthread_mutex_unlock(pthread_mutex_t *mutex);
 28 
 29 /*
 30        int pthread_cond_timedwait(pthread_cond_t *restrict cond,
 31               pthread_mutex_t *restrict mutex,
 32               const struct timespec *restrict abstime);
 33        int pthread_cond_wait(pthread_cond_t *restrict cond,
 34               pthread_mutex_t *restrict mutex);
 35               */
 36 
 37 //posix 线程库的函数 线程库
 38 void *consume(void* arg)
 39 {
 40     
 41     int inum = 0;
 42     inum = (int)arg;
 43     while(1)
 44     {
 45         pthread_mutex_lock(&mutex);
 46         printf("consum:%d\\n", inum);
 47         while (g_Count == 0)  //while 醒来以后需要重新判断 条件g_Count是否满足,如果不满足,再次wait
 48         {
 49             printf("consum:%d 开始等待\\n", inum);
 50             pthread_cond_wait(&my_condition, &mutex); //api做了三件事情 //pthread_cond_wait假醒
 51             printf("consum:%d 醒来\\n", inum);
 52         }
 53     
 54         printf("consum:%d 消费产品begin\\n", inum);
 55         g_Count--; //消费产品
 56         printf("consum:%d 消费产品end\\n", inum);
 57         
 58         pthread_mutex_unlock(&mutex);
 59     
 60         sleep(1);
 61     }
 62 
 63     pthread_exit(0);
 64 
 65 } 
 66 
 67 //生产者线程
 68 //
 69 void *produce(void* arg)
 70 {
 71     int inum = 0;
 72     inum = (int)arg;
 73     
 74     while(1)
 75     {
 76     
 77     /*
 78         //因为是很多生产者调用produce,要保护全局变量g_Count,所以加锁 
 79         pthread_mutex_lock(&mutex);
 80         if (g_Count > 20)
 81         {
 82             printf("produce:%d 产品太多,需要控制,休眠\\n", inum);
 83             pthread_mutex_unlock(&mutex);
 84             sleep(1);
 85             continue;
 86         }
 87         else
 88         {
 89             pthread_mutex_unlock(&mutex);
 90         }
 91         */
 92     
 93         pthread_mutex_lock(&mutex);
 94         printf("产品数量:%d\\n", g_Count);
 95         printf("produce:%d 生产产品begin\\n", inum);
 96         g_Count++;
 97         //只要我生产出一个产品,就告诉消费者去消费
 98         printf("produce:%d 生产产品end\\n", inum);
 99         
100         printf("produce:%d 发条件signal begin\\n", inum);
101         pthread_cond_signal(&my_condition); //通知,在条件上等待的线程 
102         printf("produce:%d 发条件signal end\\n", inum);
103         
104         pthread_mutex_unlock(&mutex);
105         sleep(1);
106     }
107     
108     pthread_exit(0);
109 
110 } 
111 
112 //结论:return arg 和 pthread_exit()的结果都可以让pthread_join 接过来
113 int main()
114 {
115     int        i =0;
116     pthread_t    tidArray[CUSTOM_COUNT+PRODUCT_COUNT+10];
117     
118     //创建消费者线程
119     for (i=0; i<CUSTOM_COUNT; i++)
120     {
121         pthread_create(&tidArray[i], NULL, consume, (void *)i);
122     }
123     
124     sleep(1);
125     //创建生产线程
126     for (i=0; i<PRODUCT_COUNT; i++)
127     {
128         pthread_create(&tidArray[i+CUSTOM_COUNT], NULL, produce, (void*)i);
129     }
130     
131     
132     
133     for (i=0; i<CUSTOM_COUNT+PRODUCT_COUNT; i++)
134     {
135         pthread_join(tidArray[i], NULL); //等待线程结束。。。
136     }
137     
138     
139     printf("进程也要结束1233\\n");
140     
141     return 0;
142 }

执行结果如下:

 条件等待模型如下:

 

多线程访问共享内存,通过信号量同步:不同进程中的线程无法进行同步与互斥

  1 #include <sys/types.h>
  2 #include <unistd.h>
  3 
  4 #include <stdlib.h>
  5 #include <stdio.h>
  6 #include <string.h>
  7 
  8 #include <signal.h>
  9 #include <errno.h>
 10 #include <signal.h>
 11 #include <sys/wait.h>
 12 
 13 #include "myipc_sem.h"
 14 #include "myipc_shm.h"
 15 
 16 #include <pthread.h>
 17 
 18 int g_key = 0x3333;
 19 
 20 void TestFunc(int loopnum)
 21 {
 22     printf("loopnum:%d\\n", loopnum);
 23     
 24     int ncount = 0;
 25     int ret = 0;
 26     int shmhdl = 0;
 27     int *addr = NULL;
 28     
 29     int semid = 0;
 30     sem_open(g_key, &semid);
 31 
 32 
 33      sem_p(semid); //临界区开始
 34     //
 35         ret = IPC_CreatShm(".", 0, &shmhdl);
 36         
 37         ret =IPC_MapShm(shmhdl, (void **)&addr);
 38         *((int *)addr) =  *((int *)addr)  + 1;
 39         ncount = *((int *)addr);
 40         printf("ncount:%d\\n", ncount);
 41         //addr[0] = addr[0] +1;
 42         ret =IPC_UnMapShm(addr);
 43         //sleep(2);
 44         
 45     sem_v(semid);  //临界区开始
 46     //
 47     printf("进程正常退出:%d\\n", getpid());
 48 }    
 49 
 50 
 51 
 52 //posix 线程库的函数 线程库
 53 void *thread_routine(void* arg)
 54 {
 55     printf("thread_routine start\\n");
 56     TestFunc(1);
 57     pthread_exit(0);
 58 
 59 } 
 60 
 61 int main(void )
 62 {
 63     int res;
 64     int procnum=10;
 65     int loopnum = 100;
 66     
 67     pthread_t tidArray[200];
 68 
 69 
 70     int  i=0,j = 0;
 71 
 72     printf("请输入要创建子进程的个数 : \\n");
 73     scanf("%d", &procnum);
 74 
 75     printf("请输入让每个子进程测试多少次 :\\n");
 76     scanf("%d", &loopnum);
 77     
 78     //共享内存创建
 79     int        ret = 0;
 80     int     shmhdl = 0;
 81     ret = IPC_CreatShm(".", sizeof(int), &shmhdl);
 82     if (ret != 0)
 83     {
 84         printf("func IPC_CreatShm() err:%d \\n", ret);
 85         return ret;
 86     }
 87     
 88     
 89     //信号量的创建
 90      int      semid = 0;
 91     ret = sem_creat(g_key, &semid);
 92     if (ret != 0)
 93     {
 94         printf("func sem_creat() err:%d,重新按照open打开信号量 \\n", ret);
 95         if (ret == SEMERR_EEXIST)
 96         {
 97             ret = sem_open(g_key, &semid);
 98             if (ret != 0)
 99             {
100                 printf("按照打开的方式,重新获取sem失败:%d \\n", ret);
101                 return ret;
102             }
103         }
104         else
105         {
106             return ret;
107         }
108         
109     }
110     
111     int  val = 0;
112     ret = sem_getval(semid, &val);
113     if (ret != 0 )
114     {
115         printf("func sem_getval() err:%d \\n", ret);
116         return ret;
117     }
118     printf("sem val:%d\\n", val);
119     getchar();
120     
121     for (i=0; i<procnum; i++)
122     {
123     
124         //tmp.numId = i; //这说明 线程函数在使用了一个不断变化的内存空间。。。。
125         pthread_create(&tidArray[i], NULL, thread_routine, NULL);
126     }
127     
128     for (i=0; i<procnum; i++)
129     {
130         pthread_join(tidArray[i], NULL); //等待线程结束。。。
131     }
132 
133 
134     printf("父进程退出 hello...\\n");
135     return 0;
136 }

多线程访问共享内存,通过线程锁实现:

  1 #include <sys/types.h>
  2 #include <unistd.h>
  3 
  4 #include <stdlib.h>
  5 #include <stdio.h>
  6 #include <string.h>
  7 
  8 #include <signal.h>
  9 #include <errno.h>
 10 #include <signal.h>
 11 #include <sys/wait.h>
 12 
 13 #include "myipc_sem.h"
 14 #include "myipc_shm.h"
 15 
 16 #include <pthread.h>
 17 
 18 pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
 19 
 20 int g_key = 0x3333;
 21 
 22 void TestFunc(int loopnum)
 23 {
 24     printf("loopnum:%d\\n", loopnum);
 25     
 26     int ncount = 0;
 27     int ret = 0;
 28     int shmhdl = 0;
 29     int *addr = NULL;
 30     
 31     int semid = 0;
 32     sem_open(g_key, &semid);
 33 
 34 
 35      sem_p(semid); //临界区开始
 36     //
 37         ret = IPC_CreatShm(".", 0, &shmhdl);
 38         
 39         ret =IPC_MapShm(shmhdl, (void **)&addr);
 40         *((int *)addr) =  *((int *)addr)  + 1;
 41         ncount = *((int *)addr);
 42         printf("ncount:%d\\n", ncount);
 43         //addr[0] = addr[0] +1;
 44         ret =IPC_UnMapShm(addr);
 45         //sleep(2);
 46         
 47     sem_v(semid);  //临界区开始
 48     //
 49     printf("进程正常退出:%d\\n", getpid());
 50 }    
 51 
 52 void TestFunc_threadMutex(int loopnum)
 53 {
 54     printf("loopnum:%d\\n", loopnum);
 55     
 56     int ncount = 0;
 57     int ret = 0;
 58     int shmhdl = 0;
 59     int *addr = NULL;
 60     
 61     int semid = 0;
 62     sem_open(g_key, &semid);
 63 
 64 
 65      //sem_p(semid); //临界区开始
 66      pthread_mutex_lock(&mymutex);
 67     //
 68         ret = IPC_CreatShm(".", 0, &shmhdl);
 69         
 70         ret =IPC_MapShm(shmhdl, (void **)&addr);
 71         *((int *)addr) =  *((int *)addr)  + 1;
 72         ncount = *((int *)addr);
 73         printf("ncount:%d\\n", ncount);
 74         //addr[0] = addr[0] +1;
 75         ret =IPC_UnMapShm(addr);
 76         //sleep(2);
 77         
 78     //sem_v(semid);  //临界区开始
 79     pthread_mutex_unlock(&mymutex);
 80     //
 81     printf("进程正常退出:%d\\n", getpid());
 82 }    
 83 
 84 
 85 
 86 
 87 //posix 线程库的函数 线程库
 88 void *thread_routine(void* arg)
 89 {
 90     printf("thread_routine start\\n");
 91     //TestFunc(1);
 92     TestFunc_threadMutex(1);
 93     pthread_exit(0);
 94 
 95 } 
 96 
 97 int main(void )
 98 {
 99     int res;
100     int procnum=10;
101     int loopnum = 100;
102     
103     pthread_t tidArray[1024*10];
104 
105 
106     int  i=0,j = 0;
107 
108     printf("请输入要创建子进程的个数 : \\n");
109     scanf("%d", &procnum);
110 
111     printf("请输入让每个子进程测试多少次 :\\n");
112     scanf("%d", &loopnum);
113     
114     //共享内存创建
115     int        ret = 0;
116     int     shmhdl = 0;
117     ret = IPC_CreatShm(".", sizeof(int), &shmhdl);
118     if (ret != 0)
119     {
120         printf("func IPC_CreatShm() err:%d \\n", ret);
121         return ret;
122     }
123     
124     
125     //信号量的创建
126      int      semid = 0;
127     ret = sem_creat(g_key, &semid);
128     if (ret != 0)
129     {
130         printf("func sem_creat() err:%d,重新按照open打开信号量 \\n", ret);
131         if (ret == SEMERR_EEXIST)
132         {
133             ret = sem_open(g_key, &semid);
134             if (ret != 0)
135             {
136                 printf("按照打开的方式,重新获取sem失败:%d \\n", ret);
137                 return ret;
138             }
139         }
140         else
141         {
142             return ret;
143         }
144         
145     }
146     
147     int  val = 0;
148     ret = sem_getval(semid, &val);
149     if (ret != 0 )
150     {
151         printf("func sem_getval() err:%d \\n", ret);
152         return ret;
153     }
154     printf("sem val:%d\\n", val);
155 Java总结——通过Callable接口实现多线程,生产者消费者问题,多线下载(复制)文件

一个生产者+多消费者模型中的同步,共享内存

day8

生产者消费者模型 与多线程

多线程之生产者消费者模式

多线程——生产者和消费者