Linux-同步-条件变量接-生产者消费者模型

Posted 天津 唐秙

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux-同步-条件变量接-生产者消费者模型相关的知识,希望对你有一定的参考价值。

1. 同步

  1.作用:让多个执行流在访问临界资源的时候是合理访问的。

2. 条件变量

2.1 本质

  一个PCB等待队列,条件变量 = PCB等待队列 + 一堆接口
PCB等待队列: 当线程发现资源不可用的时候,调用条件变量接口,将自己放在PCB等待队列,等待被唤醒。

2.2 条件变量接口

条件变量的类型: pthread_cond_t

2.2.1 初始化

静态初始化:
在这里插入图片描述
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

动态初始化:
在这里插入图片描述
  int pthread_cond_t init(pthread_cond_t* cond, const pthread_condattr_t *attr);
cond: 待要初始化的“条件变量”的变量
  一般情况下,传递一个pthread_cond_t类型变量的地址
attr: 一般情况下给NULL,采用默认属性

2.2.2 等待接口

在这里插入图片描述
  int pthread_cond_wait(pthread_cond_t * cond, pthread_mutex_t *mutex);
cond: 条件变量
mutex: 互斥锁
作用: 如果一个执行流调用了该接口,就会将执行流对应的PCB放到参数cond的PCB等待队列当中。

2.2.3 唤醒接口

在这里插入图片描述
  int pthread_cond_signal(pthread_cond_t *cond);
作用: 通知(唤醒)PCB等待队列当中的线程,如果被通知(唤醒)的线程收到了,则从PCB等待队列当中出队操作,正常执行代码。
注意: 至少唤醒一个PCB等待队列当中的线程。

在这里插入图片描述
  int pthread_cond_broadcast(pthread_cond_t *cond);
注意: 唤醒所有PCB等待队列当中的线程

2.2.4 销毁接口

在这里插入图片描述
  int pthread_cond_destroy(pthread_cond_t *cond);

3. 生产者与消费者模型

3.1 123规则

1.一个线程安全的队列
  队列:先进先出
  线程安全:当前这个队列在被其他线程操作的时候出队和入队是保证原子性的
2.两种角色的线程
  消费者线程:从线程安全队列当中获取元素,进行处理
  生产者线程:生产元素放到线程安全的队列当中进行处理
3.三种关系
  消费者与消费者互斥,生产者和生产者互斥,消费者与消费者互斥+同步
在这里插入图片描述

3.2 作用

  1.支持忙闲不均,可以提高程序运行效率。
  2.队列提供了一个缓冲区的作用,可以缓冲待要处理的元素。

4. 面试题

4.1 条件变量的等待接口为什么需要互斥锁?

  传递互斥锁的原因是由于需要在pthread_cond_wait函数内部进行解锁,解锁之后,其他的执行流就能获得这把互斥锁,否则,如果在调用pthread_cond_wait的线程,在进行等待的时候,不释放互斥锁,那么其他线程也不会获取到互斥锁,程序就没办法继续向前了。

4.2 在调用该接口的时候,pthread_cond_wait函数的实现逻辑?

  1.放在PCB等待队列
  2.释放互斥锁
  3.等待被唤醒

4.3 如果一个线程在等待的时候,被唤醒之后,需要做什么事情?

  1.移动出PCB等待队列再进行抢锁
  2.抢互斥锁
    抢到了:pthread_cond_wait函数返回了。
    没抢到:pthread_cond_wait函数没有返回,等待抢锁。

5. 代码实现

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>

#define THREAD_NUM 4

int g_bowl = 1;
pthread_mutex_t g_lock;
//eat
pthread_cond_t g_cond;
//make
pthread_cond_t g_make_cond;

//eat
void* MyThreadEat(void* arg)
{
    while(1)
    {
        pthread_mutex_lock(&g_lock);
        //判断是否能够吃
        while(g_bowl == 0)
        {
            //等待
            pthread_cond_wait(&g_cond, &g_lock);
        }
        printf("i eat %d, i am %p\\n", g_bowl, pthread_self());
        g_bowl--;
        pthread_mutex_unlock(&g_lock);
        //通知做面
        pthread_cond_signal(&g_make_cond);
    }
    return NULL;
}

void* MyThreadMake(void* arg)
{
    while(1)
    {
        pthread_mutex_lock(&g_lock);
        //判断是否能够继续做面
        while(g_bowl == 1)
        {
            //等待
            pthread_cond_wait(&g_make_cond, &g_lock);
        }
        printf("i make %d, i am %p\\n", g_bowl, pthread_self());
        g_bowl++;
        pthread_mutex_unlock(&g_lock);
        //通知吃面
        pthread_cond_signal(&g_cond);
    }
    return NULL;
}

int main()
{
   pthread_mutex_init(&g_lock, NULL);
   pthread_cond_init(&g_cond, NULL);
   pthread_cond_init(&g_make_cond, NULL);
   pthread_t tid_A[THREAD_NUM], tid_B[THREAD_NUM];
   for(int i = 0; i < THREAD_NUM; i++)
   {
       int ret = pthread_create(&tid_A[i], NULL, MyThreadEat, NULL);
       if(ret < 0)
       {
           perror("pthread_create");
           exit(0);
       }
       ret = pthread_create(&tid_B[i], NULL, MyThreadMake, NULL);
       if(ret < 0)
       {
           perror("pthread_create");
           exit(0);
       }
   }
   for(int i = 0; i < THREAD_NUM; i++)
   {
       pthread_join(tid_A[i], NULL);
       pthread_join(tid_B[i], NULL);
   }

   pthread_mutex_destroy(&g_lock);
   pthread_cond_destroy(&g_cond);
   pthread_cond_destroy(&g_make_cond);
    return 0;
}

  

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>

#include <iostream>
#include <queue>

using namespace std;

#define THREAD_NUM 1

class RingQueue
{
    public:
        RingQueue()
        {
            //设置容量
            capacity_ = 10;
            //互斥锁初始化
            pthread_mutex_init(&lock_, NULL);
            //生产者的条件变量初始化
            pthread_cond_init(&cons_cond_, NULL);
            //消费者的条件变量初始化
            pthread_cond_init(&prod_cond_, NULL);
        }

        ~RingQueue()
        {
            //销毁互斥锁资源
            pthread_mutex_destroy(&lock_);
            //销毁生产者条件变量
            pthread_cond_destroy(&cons_cond_);
            //销毁消费者条件变量
            pthread_cond_destroy(&prod_cond_);
        }

        void Push(int data)
        {
            //加锁
            pthread_mutex_lock(&lock_);
            //队列中元素个数大于等于容量
            //不能再继续插入,将生产线程放入PCB等待队列
            while(que_.size() >= capacity_)
            {
                pthread_cond_wait(&prod_cond_, &lock_);
            }
            //插入数据,入队
            que_.push(data);
            printf("i product %d, i am %p\\n", data, pthread_self());
            //解锁
            pthread_mutex_unlock(&lock_);
            //唤醒PCB等待队列的消费者
            pthread_cond_signal(&cons_cond_);
        }

        void Pop(int* data)
        {
            //加锁
            pthread_mutex_lock(&lock_);
            //判断队列中的是否还有元素
            //如果没有消费者进入PCB等待队列
            while(que_.empty())
            {
                pthread_cond_wait(&cons_cond_, &lock_);
            }
            //将数据传递给出参
            *data = que_.front();
            printf("i consume %d, i am %p\\n", data, pthread_self());
            //将数据出队
            que_.pop();
            //解锁
            pthread_mutex_unlock(&lock_);
            //唤醒PCB等待队列的生产者
            pthread_cond_signal(&prod_cond_);
        }
    private:
        //模板 实例化一个int类型的队列
        queue<int> que_;
        //设置队列大小
        size_t capacity_;

        //定义一个锁资源
        pthread_mutex_t lock_;
        //定义一个生产者的条件变量
        pthread_cond_t cons_cond_;
        //定义一个消费者的条件变量
        pthread_cond_t prod_cond_;

        std::vector<int> vec_;
};

void* ConsumeStart(void* arg)
{
    RingQueue* rq = (RingQueue*)arg;
    while(1)
    {
        int data;
        rq->Pop(&data);
    }
    return NULL;
}

int data = 1;
pthread_mutex_t g_data_lock;

void* ProductStart(void* arg)
{
    RingQueue* rq = (RingQueue*)arg;
    while(1)
    {
        //这块需要加锁的原因是data是临界资源
        pthread_mutex_lock(&g_data_lock);
        rq->Push(data++);
        pthread_mutex_unlock(&g_data_lock);
    }
    return NULL;
}

int main()
{
    pthread_mutex_init(&g_data_lock, NULL);
    RingQueue* rq = new RingQueue();
    if(rq == NULL)
    {
        return 0;
    }

    pthread_t cons[THREAD_NUM];
    pthread_t prod[THREAD_NUM];

    for(int i = 0; i < THREAD_NUM; i++)
    {
        int ret = pthread_create(&cons[i], NULL, ConsumeStart, (void*)rq);
        if(ret < 0)
        {
            perror("pthread_create fail");
            return 0;
        }

        ret = pthread_create(&prod[i], NULL, ProductStart, (void*)rq);
        if(ret < 0)
        {
            perror("pthread_create fail");
            return 0;
        }
    }

    for(int i = 0; i < THREAD_NUM; i++)
    {
        pthread_join(cons[i], NULL);
        pthread_join(prod[i], NULL);
    }

    delete rq;
    pthread_mutex_destroy(&g_data_lock);

    return 0;
}

以上是关于Linux-同步-条件变量接-生产者消费者模型的主要内容,如果未能解决你的问题,请参考以下文章

Linux:详解多线程(线程同步信号量和生产者与消费者模型的实现)

线程同步之经典模型 —— 生产者消费者模型

线程同步之经典模型 —— 生产者消费者模型

线程同步之经典模型 —— 生产者消费者模型

Linux线程同步与互斥/生产消费者模型

C++实现 生产者消费者模型