多线程编程之生产者消费者问题

Posted 拙园

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程编程之生产者消费者问题相关的知识,希望对你有一定的参考价值。

 生产者-消费者问题(Producer-consumer problem),也称作有限缓冲问题(Bounded-buffer problem),是多线程领域的一个经典问题,可以描述为:两个或者更多个线程共享同一个缓冲区,其中一个或多个作为“生产者”会不断地向缓冲区中添加数据,另外的一个或者多个作为“消费者”从缓冲区中取走数据。这个问题的关键在于:要保证缓冲区满了之后“生产者”不能再继续添加数据,而缓冲区空了之后“消费者”不能再取走数据了。

  这个问题在多个“生产者”和“消费者”的情况下肯定要麻烦一点,所以先看一下只有一个“生产者”和一个“消费者”以及一个元素缓冲区的情况。这时候情况可以简化为:

  1. 从缓冲区中取走数据和向缓冲区中添加数据需要互斥操作保持同步,所以,这里需要用临界区或者互斥量来实现;
  2. 生产者要等待缓冲区“有空间”(由于这里缓冲区只有一个元素,所以等价于要等待缓冲区为空)才能添加数据,同样消费者也不能在缓冲区为空的时候取数据。这两个过程都需要事件或者信号量来通知进行。

  考虑好了上述两点要求,就可以设计出如下思路算法:

技术分享图片
/* 针对生产者的算法1: */
{   WaitForSingleObject(hEmpty, INFINITE);   WaitForSingleObject(hMutex, INIFINITE);   /* 生产者的活动 */   ReleaseMutex(hMutex);   ReleaseSemaphore(hFull, 1, NULL);
} /* 针对消费者的算法1: */
{   WaitForSingleObject(hFull, INFINITE);   WaitForSingleObject(hMutex, INIFINITE);   /* 消费者的活动 */   ReleaseMutex(hMutex);   ReleaseSemaphore(hEmpty, 1, NULL);
}
技术分享图片

   当然,生产者和消费者的互斥操作不用hMutex而改用EnterCriticalSection临界区也是一样的。举一个实例看看如何应用生产者-消费者算法:

技术分享图片
技术分享图片
typedef struct _MESSAGE_QUEUE    /* 消息队列的数据结构 */
{
    int threadId;
    int msgType[MAX_NUMBER];
    int count;
    HANDLE hFull;
    HANDLE hEmpty;
    HANDLE hMutex;
}MESSAGE_QUEUE;

/* 发送消息,类似于“生产者” */
void send_mseesge(int threadId, MESSAGE_QUEUE* pQueue, int msg)    
{
    assert(NULL != pQueue);
    
    if(threadId != pQueue->threadId)
        return;

    WaitForSingleObject(pQueue->hEmpty, INFINITE);
    WaitForSingleObject(pQueue->hMutex, INFINITE);
    pQueue->msgType[pQueue->count ++] = msg;
    ReleaseMutex(pQueue->hMutex);
    ReleaseSemaphore(pQueue->hFull, 1, NULL);    
}

/* 接收消息,类似于“消费者” */
void get_message(MESSAGE_QUEUE* pQueue, int* msg)
{
    assert(NULL != pQueue && NULL != msg);

    WaitForSingleObject(pQueue->hFull, INFINITE);
    WaitForSingleObject(pQueue->hMutex, INFINITE);
    *msg = pQueue->msgType[pQueue->count --];
    ReleaseMutex(pQueue->hMutex);
    ReleaseSemaphore(pQueue->hEmpty, 1, NULL);   
}
技术分享图片

   搞清楚了一个生产者和一个消费者以及一个元素的缓冲区的简单模式,下面再看看如果消费者有两个而缓冲区可以容纳四个元素的情况:

技术分享图片
技术分享图片
//1生产者,2消费者,4缓冲区
#include <stdio.h>
#include <process.h>
#include <windows.h>

const int END_PRODUCE_NUMBER = 8;  // 生产产品个数
const int BUFFER_SIZE = 4;          // 缓冲区个数
int g_Buffer[BUFFER_SIZE];          // 缓冲池
int g_i, g_j;
CRITICAL_SECTION g_cs;    // 信号量与关键段
HANDLE g_hSemaphoreBufferEmpty, g_hSemaphoreBufferFull;

// 生产者线程函数
unsigned int __stdcall ProducerThreadFun(PVOID pM)
{
    for (int i = 1; i <= END_PRODUCE_NUMBER; i++)
    {
        // 等待“缓冲区有剩余空间”的信号!
        WaitForSingleObject(g_hSemaphoreBufferEmpty, INFINITE);

        // 互斥的访问缓冲区
        EnterCriticalSection(&g_cs);
        g_Buffer[g_i] = i;
        g_i = (g_i + 1) % BUFFER_SIZE;
        LeaveCriticalSection(&g_cs);

        // 通知消费者“缓冲区有数据”了!
        ReleaseSemaphore(g_hSemaphoreBufferFull, 1, NULL);
    }
    printf("生产者完成任务,线程结束运行\n");
    return 0;
}

// 消费者线程函数
unsigned int __stdcall ConsumerThreadFun(PVOID pM)
{
    while (true)
    {
        // 等待“缓冲区中有数据”的信号
        WaitForSingleObject(g_hSemaphoreBufferFull, INFINITE);
        
        // 互斥的访问缓冲区
        EnterCriticalSection(&g_cs);
        if (g_Buffer[g_j] == END_PRODUCE_NUMBER) // 结束标志
        {
            LeaveCriticalSection(&g_cs);
            // 通知其它消费者有新数据了(结束标志)
            ReleaseSemaphore(g_hSemaphoreBufferFull, 1, NULL);
            break;
        }
        g_j = (g_j + 1) % BUFFER_SIZE;
        LeaveCriticalSection(&g_cs);

        Sleep(50);     // some other work to do
        ReleaseSemaphore(g_hSemaphoreBufferEmpty, 1, NULL);      // 给缓冲区增加一个空间
    }

    return 0;
}
int main()
{
    InitializeCriticalSection(&g_cs);
    // 初始化信号量,一个记录有产品的缓冲区个数,另一个记录空缓冲区个数
    g_hSemaphoreBufferEmpty = CreateSemaphore(NULL, 4, 4, NULL);    // 指定缓冲区初始状态存在四个“剩余空间”
    g_hSemaphoreBufferFull  = CreateSemaphore(NULL, 0, 4, NULL);
    g_i = 0;
    g_j = 0;
    memset(g_Buffer, 0, sizeof(g_Buffer));

    const int THREADNUM = 3;
    HANDLE hThread[THREADNUM];
    //生产者线程
    hThread[0] = (HANDLE)_beginthreadex(NULL, 0, ProducerThreadFun, NULL, 0, NULL);
    //消费者线程
    hThread[1] = (HANDLE)_beginthreadex(NULL, 0, ConsumerThreadFun, NULL, 0, NULL);
    hThread[2] = (HANDLE)_beginthreadex(NULL, 0, ConsumerThreadFun, NULL, 0, NULL);
    WaitForMultipleObjects(THREADNUM, hThread, TRUE, INFINITE);
    for (int i = 0; i < THREADNUM; i++)
        CloseHandle(hThread[i]);

    // 销毁信号量和关键段
    CloseHandle(g_hSemaphoreBufferEmpty);
    CloseHandle(g_hSemaphoreBufferFull);
    DeleteCriticalSection(&g_cs);
    return 0;
}    
技术分享图片

  上面的代码思路很简单,一个生产者就一直等待“缓冲区有剩余空间”这个信号,而两个消费者就一直等待“缓冲区有数据”这个信号就行了!操作缓冲区的时候采取Mutex互斥操作防止冲突。注意,在创建信号量的初期要指定初始信号量的个数,这个信号量个数决定了缓冲区的大小。每一次WaitForSingleObject都会使信号量减一,而每一次ReleaseSemaphore都会使信号量加一。

  

小结:

  “生产者-消费者”问题只需考虑两个方面即可:

  1. 生产者和消费者要对缓冲区互斥操作
  2. 生产者要等待“缓冲区有空间”这个信号才能添加数据,消费者要等待“缓冲区有数据”这个信号才能取出数据。




以上是关于多线程编程之生产者消费者问题的主要内容,如果未能解决你的问题,请参考以下文章

多线程编程之生产者消费者问题

Linux线程编程之生产者消费者问题

C++11多线程编程——生产消费者模型之条件变量

百万年薪python之路 -- 并发编程之 多线程 一

JUC并发编程 多线程设计模式 -- 异步模式之生产者/消费者

(王道408考研操作系统)第二章进程管理-第三节6:经典同步问题之生产者与消费者问题