linux生产者消费者问题
Posted chenyibin1995
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了linux生产者消费者问题相关的知识,希望对你有一定的参考价值。
(多进程+共享内存+信号量)
一.分析
生产者和消费者问题是多个相互合作的进程之间的一种抽象。生产者和消费者之间的关系:
1. 对缓冲区的访问是互斥的。由于两者都会修改缓冲区,因此,一方修改缓冲区时,另一方不能修改,这就是互斥。
2. 一方的行为影响另一方。缓冲区不空,才能消费,何时不空?生产了就不空;缓冲区满,就不能生产,何时不满?消费了就不满。这是同步关系。
为了描述这种关系,一方面,使用共享内存代表缓冲区;另一方面,使用 互斥信号量 控制对缓冲区的访问,使用同步信号量描述两者的依赖关系。
二. 共享存储
共享存储是进程间通信的一种手段,通常,使用信号量同步或互斥访问共享存储。共享存储的原理是将进程的地址空间映射到一个共享存储段。在LINUX下,通过使用 shmget 函数创建或者获取共享内存。
1. 创建
1)不指定 KEY
// IPC_PRIVATE指出需要创建内存;//SHM_SIZE 指出字节大小;
//SHM_MODE 指出访问权限字如 0600表示,用户可以读写该内存
int shmget(key_t IPC_PRIVATE,size_t SHM_SIZE,int SHM_MODE);
2)指定KEY
//如果SHM_KEY指向的共享存储已经存在,则返回共享存储的ID;
//否则,创建共享存储并返回其ID
int shmget(key_t SHM_KEY,size_t SHM_SIZE,int SHM_MODE);
2. 访问
方法一
只需要共享存储的 ID 就可以通过 shmat 函数获得共享存储所占用的实际地址。因此,可以在父进程的栈中用变量存放指向共享存储的指针,那么 fork 之后,子进程就可以很方便地通过这个指针访问共享存储了。
方法二
如果进程之间并没有父子关系,但是协商好了共享存储的 KEY , 那么在每个进程中,就可以通过 KEY 以及 shmget 函数获得共享存储的 I D , 进而通过 shmat 函数获得共享存储的实际地址,最后访问。
在我的实现中,我把生产者实现为父进程,消费者实现为子进程,并通过方法一实现进程之间共享内存。
三. 信号量集
信号量有两种原语 P 和 V ,P 锁定资源,V 释放资源。LINUX 下的使用信号量集合的接口特别复杂。我所用到的函数如下:
1. 创建或者获取信号量集合
// IPC_PRIVATE 表示创建信号量集, NUM_OF_SEM表示该集合中有多少信号量; FLAGS复杂不追究
semget(IPC_PRIVATE, NUM_OF_SEM, FLAGS );
// SEM_KEY 是 key_t 类型
//如果 SEM_KEY 代表的信号量集存在,则返回信号量集的ID
//如果不存在,则创建信号量集并返回ID
semget(SEM_KEY, NUM_OF_SEM,FLAGS);
2. 初始化信号量
创建的过程并未指定信号量的初始值,需要使用 semctl 函数指定。
semctl(int semSetId , int semIdx , int cmd, union semun su);
其中 semSetId 是指信号量集的 ID , semIdx 指信号量集中某个信号量的索引(从零开始), 如果是要设置信号量的值, 填 SETVAL 即可, 为了设置信号量的值,可以指定su.val为索要设置的值。
我在 UBUNTU 下使用 union semun 编译时总报错:
invalid use of undefined type ‘union semun’
据说是 Linux 下删除了 semun 的定义。可以通过自定义 semun 解决:
- #if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)
- #else
- union semun{
- int val;
- struct semid_ds *buf;
- unsigned short *array;
- };
- #endif
四.代码分解
1. 头文件
- #include "stdio.h" //支持 printf
- #include <sys/shm.h> //支持 shmget shmat 等
- #include <sys/sem.h> //支持 semget
- #include <stdlib.h> //支持 exit
2. 信号量
共需要三个信号量:
第一个信号量用于限制生产者必须在缓冲区不满时才能生产,是同步信号量
第二个信号量用于限制消费者必须在缓冲区有产品时才消费,是同步信号量
第三个信号量用于限制生产者和消费者在访问缓冲区时必须互斥,是互斥信号量
创建信号量集合,semget
- if((semSetId = semget(IPC_PRIVATE,3,SEM_MODE)) < 0)
- {
- perror("create semaphore failed");
- exit(1);
- }
初始化三个信号量,semctl,需要用到 union semun
- union semun su;
- su.val = N_BUFFER;
- if(semctl(semSetId,0,SETVAL, su) < 0){
- perror("semctl failed");
- exit(1);
- }
- su.val = 0;
- if(semctl(semSetId,1,SETVAL,su) < 0){
- perror("semctl failed");
- exit(1);
- }
- su.val = 1;
- if(semctl(semSetId,2,SETVAL,su) < 0){
- perror("semctl failed");
- exit(1);
- }
封装对信号量集中的某个信号量的值的+1或者-1操作
- void waitSem(int semSetId,int semNum)
- {
- struct sembuf sb;
- sb.sem_num = semNum;
- sb.sem_op = -1;
- sb.sem_flg = SEM_UNDO;
- if(semop(semSetId,&sb,1) < 0){
- perror("waitSem failed");
- exit(1);
- }
- }
- void sigSem(int semSetId,int semNum)
- {
- struct sembuf sb;
- sb.sem_num = semNum;
- sb.sem_op = 1;
- sb.sem_flg = SEM_UNDO;
- if(semop(semSetId,&sb,1) < 0){
- perror("waitSem failed");
- exit(1);
- }
- }
3. 使用共享内存
- struct ShM{
- int start;
- int end;
- }* pSM;
- if((shmId = shmget(IPC_PRIVATE,SHM_SIZE,SHM_MODE)) < 0)
- {
- perror("create shared memory failed");
- exit(1);
- }
- pSM = (struct ShM *)shmat(shmId,0,0);
- pSM->start = 0;
- pSM->end = 0;
4. 生产过程
- while(1)
- {
- waitSem(semSetId,0);
- waitSem(semSetId,2);
- produce();
- sigSem(semSetId,2);
- sleep(1);
- sigSem(semSetId,1);
- }
5. 消费过程
- while(1)
- {
- waitSem(semSetId,1);
- waitSem(semSetId,2);
- consume();
- sigSem(semSetId,2);
- sigSem(semSetId,0);
- sleep(2);
- }
五. 代码全文
- #include "stdio.h"
- #include <sys/shm.h>
- #include <sys/sem.h>
- #include <stdlib.h>
- #define SHM_SIZE (1024*1024)
- #define SHM_MODE 0600
- #define SEM_MODE 0600
- #if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)
- #else
- union semun{
- int val;
- struct semid_ds *buf;
- unsigned short *array;
- };
- #endif
- struct ShM{
- int start;
- int end;
- }* pSM;
- const int N_CONSUMER = 3;
- const int N_BUFFER = 5;
- int shmId = -1,semSetId=-1;
- union semun su;
- void waitSem(int semSetId,int semNum)
- {
- struct sembuf sb;
- sb.sem_num = semNum;
- sb.sem_op = -1;
- sb.sem_flg = SEM_UNDO;
- if(semop(semSetId,&sb,1) < 0){
- perror("waitSem failed");
- exit(1);
- }
- }
- void sigSem(int semSetId,int semNum)
- {
- struct sembuf sb;
- sb.sem_num = semNum;
- sb.sem_op = 1;
- sb.sem_flg = SEM_UNDO;
- if(semop(semSetId,&sb,1) < 0){
- perror("waitSem failed");
- exit(1);
- }
- }
- void produce()
- {
- int last = pSM->end;
- pSM->end = (pSM->end+1) % N_BUFFER;
- printf("生产 %d\\n",last);
- }
- void consume()
- {
- int last = pSM->start;
- pSM->start = (pSM->start + 1)%N_BUFFER;
- printf("消耗 %d\\n",last);
- }
- void init()
- {
- if((shmId = shmget(IPC_PRIVATE,SHM_SIZE,SHM_MODE)) < 0)
- {
- perror("create shared memory failed");
- exit(1);
- }
- pSM = (struct ShM *)shmat(shmId,0,0);
- pSM->start = 0;
- pSM->end = 0;
- if((semSetId = semget(IPC_PRIVATE,3,SEM_MODE)) < 0)
- {
- perror("create semaphore failed");
- exit(1);
- }
- su.val = N_BUFFER;
- if(semctl(semSetId,0,SETVAL, su) < 0){
- perror("semctl failed");
- exit(1);
- }
- su.val = 0;
- if(semctl(semSetId,1,SETVAL,su) < 0){
- perror("semctl failed");
- exit(1);
- }
- su.val = 1;
- if(semctl(semSetId,2,SETVAL,su) < 0){
- perror("semctl failed");
- exit(1);
- }
- }
- int main()
- {
- int i = 0,child = -1;
- init();
- for(i = 0; i < N_CONSUMER; i++)
- {
- if((child = fork()) < 0)
- {
- perror("the fork failed");
- exit(1);
- }
- else if(child == 0)
- {
- printf("我是第 %d 个消费者子进程,PID = %d\\n",i,getpid());
- while(1)
- {
- waitSem(semSetId,1);
- waitSem(semSetId,2);
- consume();
- sigSem(semSetId,2);
- sigSem(semSetId,0);
- sleep(2);
- }
- break;
- }
- }
- if(child > 0)
- {
- while(1)
- {
- waitSem(semSetId,0);
- waitSem(semSetId,2);
- produce();
- sigSem(semSetId,2);
- sleep(1);
- sigSem(semSetId,1);
- }
- }
- return 0;
- }
以上是关于linux生产者消费者问题的主要内容,如果未能解决你的问题,请参考以下文章