在windows操作系统中用信号量机制解决生产者消费者问题的代码

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在windows操作系统中用信号量机制解决生产者消费者问题的代码相关的知识,希望对你有一定的参考价值。

windows里面信号量比较难用,下面是我模仿一本教材写的一个例子,加了一些注释,在.net 2008环境下调试通过的。你可以参考下,不过直接复制过去是很难调的过的吧,还有其他一些相关文件。

// ProducerCustomer2.cpp : 定义控制台应用程序的入口点。
//

#include "stdafx.h"

#include <windows.h>
#include <fstream> //与课本不同
#include <iostream> //与课本不同
#include <string>
#include <conio.h>

using namespace std;

//定义一些常量;
//本程序允许的最大临界区数;
#define MAX_BUFFER_NUM 10
//秒到毫秒的乘法因子;
#define INTE_PER_SEC 1000
//本程序允许的生产和消费线程的总数;
#define MAX_THREAD_NUM 64

//定义一个结构,记录在测试文件中指定的每一个线程的参数
struct ThreadInfo

int serial; //线程序列号
char entity; //是P还是C
double delay; //线程延迟
int thread_request[MAX_THREAD_NUM]; //线程请求队列
int n_request; //请求个数
;

//全局变量的定义

//临界区对象的声明,用于管理缓冲区的互斥访问;
CRITICAL_SECTION PC_Critical[MAX_BUFFER_NUM];
int Buffer_Critical[MAX_BUFFER_NUM]; //缓冲区声明,用于存放产品;
HANDLE h_Thread[MAX_THREAD_NUM]; //用于存储每个线程句柄的数组;
ThreadInfo Thread_Info[MAX_THREAD_NUM]; //线程信息数组;
HANDLE empty_semaphore; //一个信号量;
HANDLE h_mutex; //一个互斥量;
DWORD n_Thread = 0; //实际的线程的数目;
DWORD n_Buffer_or_Critical; //实际的缓冲区或者临界区的数目;
HANDLE h_Semaphore[MAX_THREAD_NUM]; //生产者允许消费者开始消费的信号量;

//生产消费及辅助函数的声明
void Produce(void *p);
void Consume(void *p);
bool IfInOtherRequest(int);
int FindProducePositon();
int FindBufferPosition(int);

int main(void)

//声明所需变量;
DWORD wait_for_all;
ifstream inFile;

//初始化缓冲区;
for(int i=0;i< MAX_BUFFER_NUM;i++)
Buffer_Critical[i] = -1;
//初始化每个线程的请求队列;
for(int j=0;j<MAX_THREAD_NUM;j++)
for(int k=0;k<MAX_THREAD_NUM;k++)
Thread_Info[j].thread_request[k] = -1;
Thread_Info[j].n_request = 0;

//初始化临界区;
for(int i =0;i<MAX_BUFFER_NUM;i++)
InitializeCriticalSection(&PC_Critical[i]);

//打开输入文件,按照规定的格式提取线程等信息;
inFile.open("test.txt");
//从文件中获得实际的缓冲区的数目;
inFile >> n_Buffer_or_Critical;
n_Buffer_or_Critical = inFile.get();
printf("输入文件是:\n");
//回显获得的缓冲区的数目信息;
printf("%d \n",(int) n_Buffer_or_Critical);
//提取每个线程的信息到相应数据结构中;
while(inFile)
inFile >> Thread_Info[n_Thread].serial;
inFile >> Thread_Info[n_Thread].entity;
inFile >> Thread_Info[n_Thread].delay;
char c;
inFile.get(c);
while(c!='\n'&& !inFile.eof())
inFile>> Thread_Info[n_Thread].thread_request[Thread_Info[n_Thread].n_request++];
inFile.get(c);

n_Thread++;


//回显获得的线程信息,便于确认正确性;
for(int j=0;j<(int) n_Thread;j++)
int Temp_serial = Thread_Info[j].serial;
char Temp_entity = Thread_Info[j].entity;
double Temp_delay = Thread_Info[j].delay;
printf(" \n thread%2d %c %f ",Temp_serial,Temp_entity,Temp_delay);
int Temp_request = Thread_Info[j].n_request;
for(int k=0;k<Temp_request;k++)
printf(" %d ", Thread_Info[j].thread_request[k]);
cout<<endl;

printf("\n\n");

//创建在模拟过程中几个必要的信号量
empty_semaphore=CreateSemaphore(NULL,n_Buffer_or_Critical,n_Buffer_or_Critical,
NULL);
h_mutex =CreateMutex(NULL,FALSE,NULL);

//下面这个循环用线程的ID号来为相应生产线程的产品读写时所
//使用的同步信号量命名;
for(int j=0;j<(int)n_Thread;j++)
std::string lp ="semaphore_for_produce_";
int temp =j;
while(temp)
char c = (char)(temp%10);
lp+=c;
temp/=10;

h_Semaphore[j+1]=CreateSemaphore(NULL,0,n_Thread,NULL);


//创建生产者和消费者线程;
for(int i =0;i< (int) n_Thread;i++)
if(Thread_Info[i].entity =='P')
h_Thread[i]=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)(Produce),
&(Thread_Info[i]),0,NULL);
else
h_Thread[i]=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)(Consume),
&(Thread_Info[i]),0,NULL);


//主程序等待各个线程的动作结束;
wait_for_all = WaitForMultipleObjects(n_Thread,h_Thread,TRUE,-1);
printf(" \n \n全部生产者和消费者都已完成它们的工作. \n");
printf("按任意键返回!\n");
_getch();
return 0;


//确认是否还有对同一产品的消费请求未执行;
bool IfInOtherRequest(int req)

for(int i=0;i<n_Thread;i++)
for(int j=0;j<Thread_Info[i].n_request;j++)
if(Thread_Info[i].thread_request[j] == req)
return TRUE;

return FALSE;


//找出当前可以进行产品生产的空缓冲区位置;
int FindProducePosition()

int EmptyPosition;
for (int i =0;i<n_Buffer_or_Critical;i++)
if(Buffer_Critical[i] == -1)
EmptyPosition = i;
//用下面这个特殊值表示本缓冲区正处于被写状态;
Buffer_Critical[i] = -2;
break;

return EmptyPosition;


//找出当前所需生产者生产的产品的位置;
int FindBufferPosition(int ProPos)

int TempPos;
for (int i =0 ;i<n_Buffer_or_Critical;i++)
if(Buffer_Critical[i]==ProPos)
TempPos = i;
break;

return TempPos;


//生产者进程
void Produce(void *p)

//局部变量声明;
DWORD wait_for_semaphore,wait_for_mutex,m_delay;
int m_serial;

//获得本线程的信息;
m_serial = ((ThreadInfo*)(p))->serial;
m_delay = (DWORD)(((ThreadInfo*)(p))->delay *INTE_PER_SEC);

Sleep(m_delay);
//开始请求生产
printf("生产者 %2d 发送生产请求信号.\n",m_serial);

//确认有空缓冲区可供生产,同时将空位置数empty减1;用于生产者和消费者的同步;
wait_for_semaphore = WaitForSingleObject(empty_semaphore,-1);

//互斥访问下一个可用于生产的空临界区,实现写写互斥;
wait_for_mutex = WaitForSingleObject(h_mutex,-1);
int ProducePos = FindProducePosition();
ReleaseMutex(h_mutex);

//生产者在获得自己的空位置并做上标记后,以下的写操作在生产者之间可以并发执行;
//核心生产步骤中,程序将生产者的ID作为产品编号放入,方便消费者识别;
printf("生产者 %2d 开始在缓冲区 %2d 生产产品.\n",m_serial,ProducePos);
Buffer_Critical[ProducePos] = m_serial;
printf("生产者 %2d 完成生产过程 :\n ",m_serial);
printf(" 缓冲区[ %2d ]:%3d \n" ,ProducePos,Buffer_Critical[ProducePos]);
//使生产者写的缓冲区可以被多个消费者使用,实现读写同步;
ReleaseSemaphore(h_Semaphore[m_serial],n_Thread,NULL);


//消费者进程
void Consume(void * p)

//局部变量声明;
DWORD wait_for_semaphore,m_delay;
int m_serial,m_requestNum; //消费者线程的序列号和请求的数目;
int m_thread_request[MAX_THREAD_NUM];//本消费者线程的请求队列;

//提取本线程的信息到本地;
m_serial = ((ThreadInfo*)(p))->serial;
m_delay = (DWORD)(((ThreadInfo*)(p))->delay *INTE_PER_SEC);
m_requestNum = ((ThreadInfo *)(p))->n_request;
for (int i = 0;i<m_requestNum;i++)
m_thread_request[i] = ((ThreadInfo*)(p))->thread_request[i];

Sleep(m_delay);
//循环进行所需产品的消费
for(int i =0;i<m_requestNum;i++)

//请求消费下一个产品
printf("消费者 %2d 请求消费 %2d 产品\n",m_serial,m_thread_request[i]);
//如果对应生产者没有生产,则等待;如果生产了,允许的消费者数目-1;实现了读写同步;
wait_for_semaphore=WaitForSingleObject(h_Semaphore[m_thread_request[i]],-1);

//查询所需产品放到缓冲区的号
int BufferPos=FindBufferPosition(m_thread_request[i]);

//开始进行具体缓冲区的消费处理,读和读在该缓冲区上仍然是互斥的;
//进入临界区后执行消费动作;并在完成此次请求后,通知另外的消费者本处请求已
//经满足;同时如果对应的产品使用完毕,就做相应处理;并给出相应动作的界面提
//示;该相应处理指将相应缓冲区清空,并增加代表空缓冲区的信号量;
EnterCriticalSection(&PC_Critical[BufferPos]);
printf("消费者 %2d 开始消费 %2d 产品 \n",m_serial,m_thread_request[i]);
((ThreadInfo*)(p))->thread_request[i] =-1;
if(!IfInOtherRequest(m_thread_request[i]))
Buffer_Critical[BufferPos] = -1;//标记缓冲区为空;
printf("消费者 %2d 成功消费 %2d:\n ",m_serial,m_thread_request[i]);
printf(" 缓冲区[ %2d ]:%3d \n" ,BufferPos,Buffer_Critical[BufferPos]);
ReleaseSemaphore(empty_semaphore,1,NULL);

else
printf("消费者 %2d 成功消费产品 %2d\n ",m_serial,m_thread_request[i]);

//离开临界区
LeaveCriticalSection(&PC_Critical[BufferPos]);


参考技术A 每生产一个信号量自增,指针自增。每消耗一个先信号量自减。追问

原理我懂,关键代码怎么写不懂

操作系统-进程进程通信机制

交互式并发进程通过信号量及PV操作可以实现进程的互斥与同步,例如生产者-消费者就是一组相互协作的进程,它们通过信号量来协同工作,并引入有界缓冲区来存取。这种低级通信方式不方便而且局限性很大。
管程适用于高级程序设计编程,它把分散在各进程中的临界区集中统一管理,采用阻塞/唤醒+集中临界区和一次状态测试策略,提供了同步工具的一种新选择。
上述工具可以解决同步和通信问题,但缺乏传递数据能力。因此设计了既有通信能力又有同步能力的工具——进程通信工具。不同形式的进程通信工具主要有:
一、信号通信机制
由进程执行产生的信号称为同步信号,如被0除;进程以外事件引起的信号称为异步信号,如击键。
用户、内核和进程都能生成发送信号:
(1)用户按中断组合键【Ctrl】+【C】,终端驱动程序接收到输入字符,并调用信号系统,信号系统发送SIGINT信号给shell,shell再把它发送给进程,进程收到SIGINT信号会撤销。用户也可以通过终端驱动程序分配给信号控制字符的其它按键来请求内核产生信号。
(2)进程执行出错时,内核通过中断、异常机制检测事件并向进程发送信号,例如非法段存取、浮点数溢出、非法操作码。内核也可以通过信号通知进程有特定事件发生。
(3)进程可通过系统调用kill给另一个进程发送SIGKIL信号强迫其终止。进程也可以通过信号与另一个进程通信。
信号机制完全由软件实现,进程接收到信号与CPU接收到中断类似,但信号是在异常处理末尾或时钟中断处理结束之前进行查询。
信号也有屏蔽设施,可以设置信号屏蔽位,使进程对于发来的信号不予理睬。
接收信号的进程所做的响应为:执行默认操作(如SIGINT信号的默认处理是进程撤销)、执行预置信号处理程序或忽略此信号。
信号处理程序往往由应用程序提供,通过信号编号查信号向量表(位于系统空间),从而找到并转入相应信号处理程序,在用户空间执行。
 
二、管道通信机制
多个进程使用一个共享的消息缓冲区(可称为管道、多路转接器、套接字)。一些进程往消息缓冲区中写入字符流(send/write);一些进程从消息缓冲区中读出字符流(receive/read)。信息交换单位基于长度任意的字符流。
技术分享图片
管道(pipeline)是UNIX的传统进程通信方式,它是连接读写的一个特殊文件,允许按照FIFO方式传送数据,也能使进程同步执行。
管道是单向的,发送进程视管道文件为输出文件,以字符流的形式把大量数据送入管道;接收进程视管道文件为输入文件,从管道中接收数据。
写进程和读进程不能仅依靠文件系统实现相互同步,还必须做到:
(1)进程在读写管道前,会测试文件inode节点的读写互斥标志,若已锁住进程便等待,否则把inode上锁然后进行读写操作。操作结束再解锁并唤醒因节点上锁而等待的进程。
(2)发送者接受者双方必须知道对方是否存在。对方不存在的话系统会通过SIGPIPE信号通知进程没必要再发送、接收信息。
(3)管道文件只使用inode节点的直接地址项,管道长度限制对于进程的write()和read()操作产生影响。管道溢出后write()操作必须暂停,以防管道溢出,直到其它进程从管道中读取数据。
解决方法:把数据进行切分,每次均小于管道限制的字节数,写完后此进程睡眠,直到读进程把管道中的数据取走并判别有进程等待时唤醒对方,以便继续写下一批数据;当读进程读空管道时,要出现read()阻塞,读进程睡眠直到写进程唤醒它。
Linux系统中,管道是利用文件系统的file结构和inode节点实现的。内核调用pipe()创建管道时,在系统打开文件表中建立管道文件的两个file结构分别用于控制管道的写操作和读操作。两个file结构指向同一个inode,而该inode又指向磁盘中的物理页面,返回句柄file[0]、file[1],读者进程通过files[0]从管道读出数据,写者进程通过files[1]向管道写入数据。
 
三、共享内存通信机制
一个进程首先创建一块内存区作为通信使用,而其它进程则将这块内存区映射到自己的虚存空间。因为不止一个进程可将共享内存映射到各自的虚地址空间中,读写共享内存区的代码段通常被认为是临界区。
由于进程的虚地址空间很大,所定义的共享内存区对应于一段未使用的虚地址区,以免和进程映像区冲突。
共享内存的页面在每个共享进程的页表中都有页表项引用,但无需在所有进程的虚存段都有相同地址。
 
四、消息传递通信机制
进程有时还需要交换更多的信息(如把数据传送给另一个进程),可以引进高级通信方式消息传递机制,实现进程间用信件来交换信息,Linux中称为消息队列。
直接通信方式:
发送或接收信件的进程,通过把把信件发送给进程P的send(P, 信件)原语和从进程Q接收信件的receive(Q, 信件)原语搭配使用,以指出信件发给谁或从谁那里接收信件。
间接通信方式:
信箱是存放信件的存储区域,每个信箱可以分成信箱特征和信箱体两部分:信箱特征指出信箱容量、信件格式、指针等;信箱体用来存放信件,信箱体分成若干个区,每个区可容纳一封信
多个进程共享一个信箱。发送或者接收信件通过有唯一标识符的信箱A、把件传送到信箱A的send(A, 信件)原语、从信箱A接收信件的receive(A, 信件)原语来进行。
若指定的信箱未满,则把信件送入信箱中指针所指示的位置,释放等待该信箱中信件的等待者;否则,发送信件者被置成等待信箱的状态
若指定信箱中有信件,则取出一封信件,释放等待信箱的等待者;否则,接收信件者被置成等待信箱中信件的状态
通信进程的同步:
发送进程执行send()发出消息后,本身的执行可分为两种情况:
(1)同步的(阻塞型)send()原语:发送进程等待接收进程回答消息后才继续进行,如果发送进程试图向一个并不存在的进程发送消息,操作系统将无法识别用哪个信箱来缓存消息,会返回一个错误码给发送方,发送方依赖错误码来工作
(1)异步的(非阻塞式)send()原语:发送进程将消息传送到接收进程的信箱中后,可以继续运行,直到某个时刻需要接收进程来回答消息时才查询和处理
接收端执行reveive()的进程也可以分为:
(1)阻塞型reveive()原语:接收进程直到消息交付完成都处于等待消息的状态,如果信箱中没有消息,接收进程会被挂起,直到有消息投入信箱;如果信箱中有消息,则立刻获得一条消息并返回。该方法能够同步发送进程和接收进程的通信。
(2)非阻塞型reveive()原语:不要求接收进程等待,当它需要消息时,再接收并处理消息。在查询信箱后立刻向调用进程返还控制权,如果信箱中有消息则返回消息,否则返回标志码以表明无消息。该方法允许接收进程轮询信箱,如果信箱中没有待处理消息,可以去做其它工作。
一般采用非阻塞型send()原语+阻塞型receive()原语。假设一组并发进程共享一个信箱Box,它可供所有进程在发送和接收时使用,该信箱被初始化为空状态。系统构建一条消息发送到box,希望进入临界区的进程首先试图接收消息,如果有消息则只有一个进程能接收到此消息并进入到临界区,其它进程取消息都将被阻塞,随后该进程将消息放回信箱。
消息就相当于进程间传递的令牌,以解决进程互斥问题:
create mailbox(box);
send(box , null);
void Pi() {
    message msg;
    while(true) {
        receive(box , msg);
        /*临界区*/
        seng(box , msg);
    }
}
cobegin
Pi();
coend;
使用消息传递机制解决生产者-消费者问题
int capacity;
void producer(void) {
    int item;
    messsage m;
    while(true) {
    item = produce_item(); //生产消息
    receive(consumer , &m); //等待消费者发送空缓冲区
    build_message(&m , item);
    send(consumer , &m);
    }
}
void consumer(void) {
    int item , i;
    message m;
    for(i=0; i<capacity; i++) send(producer ,&m)
    while(true) {
        receive(producer , &m);
        item = extract_item(&m); //接收含item的消息
        send(producer , &m); //回送空缓冲区给生产者
        consumer_item(item);
    }
}
系统为生产者和消费者均创建足够容纳capacity条消息的信箱,消费者向生产者信箱发送capacity条空消息(空缓冲区),生产者每生产出一个消息后就接收一条空消息并回送一条填充好的消息给消费者。系统中总的消息数保持不变。通信机制会知道目标信箱容纳哪些已被发送但尚未被目标进程接收的消息。
 
五、远程过程调用RPC
采用客户/服务器计算模式
服务器进程提供一系列过程/服务,供客户进程调用
客户进程通过调用服务器进程提供的过程/服务获得服务
考虑到客户计算机和服务器计算机的硬件异构型,外部数据表示XDR被引入来转换每台计算机的特殊数据格式为标准数据格式
基于RPC/XDR的高级通信规约如图所示:
技术分享图片

以上是关于在windows操作系统中用信号量机制解决生产者消费者问题的代码的主要内容,如果未能解决你的问题,请参考以下文章

操作系统-进程进程通信机制

在经典生产者消费者问题中,三个信号量可以设置为更少或更多吗

进程管理(二[2])

进程管理(二[2])

qt中信号与槽机制

理解同步的PV操作