Linux多线程——生产者消费者模型

Posted 两片空白

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux多线程——生产者消费者模型相关的知识,希望对你有一定的参考价值。

目录

一.生产者消费者模型

        1.1 什么是生成者消费者模型

        1.2 生产者消费者模型的优点

        1.3 基于阻塞队列实现生产者消费者模型

         1.4 POSIX信号量

        1.4.1 信号量概念

        1.4.2 P操作和V操作

        1.4.3 理解信号量

        1.4.4 信号量的函数

         1.4.5 基于环形队列实现生产者消费者模型


一.生产者消费者模型

        1.1 什么是生成者消费者模型

        一个进程中的线程有两种角色,一种是生产者,一种是消费者。生产者为消费者提供任务,消费者拿到任务,解决任务。在生成者和消费者之间还有一个"交易场所",是一个内存块。生成者线程将任务放到内存块中,消费者线程在内存块中拿任务。当内存块数据达到一高水位线时,生产者会进行等待,唤醒消费者拿任务,当内存块数据达到一低水位线时,消费者会等待,并且唤醒生产者生产任务。

        生成者,消费者存在着3种关系。生产者和生产者之间是互斥的关系,消费者和消费者之间是互斥的关系,生产者和消费者之间是互斥和同步的关系。

        1.2 生产者消费者模型的优点

        例如一个正常的函数,不使用生产者消费者模型:

       上面是单线程的情况,即使是多线程,不使用生产者消费者模型,生产者直接给消费者送数据,整个进程的效率会是最慢的线程的效率。并且只能生产一个数据,消费一个数据。两者还是串行的,耦合度高。

        使用生产者消费者模型:

生产者和消费者模型的优点:

  • 实现线程的解耦
  • 支持线程之间并行运行
  • 效率高
  • 方便代码维护

        1.3 基于阻塞队列实现生产者消费者模型

        在多线程编程中,阻塞队列是一种常用于实现生产者和消费者模型的数据结构。其普通队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入元素,当队列满的时候,往队列中存放元素的操作也会被阻塞,直到有元素从队列中取出。

实现的是多生产者,多消费者的模型。

#pragma once 

#include<iostream>
#include<queue>
#include<pthread.h>
//队列元素个刷
#define NUM 5
//任务
struct Task{
  Task(){};
  Task(int x,int y)
    :_x(x)
    ,_y(y)
  {}
  int _x;
  int _y;
  int Add(){
    return _x + _y;
  }
};
//提供两个接口,放任务,和拿任务
class blockqueue{
private:
  //加锁
  void MakeLock(){
    pthread_mutex_lock(&_lock);
  }
  //取消锁
  void CancelLock(){
    pthread_mutex_unlock(&_lock);
  }
  //唤醒消费者
  void WakeUpConsumer(){
    std::cout<<"Consumer wake up"<<std::endl;
    pthread_cond_signal(&_empty);

  }
  //唤醒生产者
  void WakeUpProductor(){
    std::cout<<"Productor wake up"<<std::endl;
    pthread_cond_signal(&_full);

  }
  //生产者等待
  void SleepProductor(){
    std::cout<<"Productor sleep"<<std::endl;
    pthread_cond_wait(&_full, &_lock);

  }
  //消费者等待
  void SleepConsumer(){
    std::cout<<"Consumer sleep"<<std::endl;
    pthread_cond_wait(&_empty, &_lock);

  }

public:
  blockqueue(size_t cap = NUM)
    :_cap(cap)
  {
    pthread_mutex_init(&_lock, nullptr);
    pthread_cond_init(&_full, nullptr);
    pthread_cond_init(&_empty, nullptr);
  }
  ~blockqueue(){
    pthread_cond_destroy(&_empty);
    pthread_cond_destroy(&_full);
    pthread_mutex_destroy(&_lock);

  }
  //放数据
  void Put(Task in){
    //q队列是临界资源,需要加锁
    MakeLock();
    //需要使用循环
    while(_q.size()>=_cap){
      WakeUpConsumer();
      SleepProductor();
    }
    _q.push(in);
    CancelLock();

  }
  //获取数据
  void Get(Task& out){
    MakeLock();
    while(_q.empty()){
      WakeUpProductor();
      SleepConsumer();
    }
    out=_q.front();
    _q.pop();
    CancelLock();

  }

private:
  std::queue<Task> _q;
  size_t _cap;
  pthread_mutex_t _lock;
  pthread_cond_t _empty;//消费者在此等待
  pthread_cond_t _full;//生产者在此等待

};


#include"BlockQueue.hpp"
#include<unistd.h>

#define PRO 2
#define CON 2
using namespace std;
//定义两个互斥量,生产者和消费者之间要互相竞争锁
//决定哪个线程进来
pthread_mutex_t mutex1;
pthread_mutex_t mutex2;

void *Productor(void *arg){
  sleep(1);
  blockqueue *q = (blockqueue *)arg;
  while(true){
    sleep(1);
    int x=rand()%9+1;
    int y=rand()%20+1;
    Task t(x,y);
	//阻塞队列是共享资源,需要上锁
    pthread_mutex_lock(&mutex2);
    q->Put(t);
    cout<<pthread_self()<<":"<<x<<"+"<<y<<"="<<"?"<<endl;
    pthread_mutex_unlock(&mutex2);
  }
}
void *Consumer(void *arg){
  blockqueue *q = (blockqueue *)arg;
  while(true){
    sleep(1);
    Task t;
	//阻塞队列是共享资源,需要上锁
    pthread_mutex_lock(&mutex1);
    q->Get(t);
    cout<<pthread_self()<<":"<<t._x<<"+"<<t._y<<"="<<t.Add()<<endl;
    pthread_mutex_unlock(&mutex1);
  }

}

int main(){
  pthread_mutex_init(&mutex2,nullptr);
  pthread_mutex_init(&mutex1,nullptr);
  blockqueue *bq = new blockqueue();
  //生产者线程
  pthread_t td1[PRO];
  int i=0;
  for(; i<PRO; i++){
    pthread_create(td1+i, nullptr, Productor, (void *)bq);
  }
  //消费者线程
  pthread_t td2[CON];
  for(i=0; i<CON; i++){
    pthread_create(td2+i, nullptr, Consumer, (void *)bq);
  } 
  
  for(i=0; i<PRO; i++){
    pthread_join(td1[i], nullptr);
  }
  for(i=0; i<CON; i++){
    pthread_join(td2[i], nullptr);
  }
  pthread_mutex_destroy(&mutex2);
  pthread_mutex_destroy(&mutex1);
  delete bq;
  return 0;
}

 演示:

         1.4 POSIX信号量

        1.4.1 信号量概念

        有一种情况,我们可以将临界资源分成若干份,一个线程只会使用临界资源中的一份。

        这个时候就有了信号量,信号量本质是一个计数器,描述的是临界资源的有效个数。

        1.4.2 P操作和V操作

        假如:临界资源可以分成5个部分,记为count=5。count就被称作信号量。

        count--,一个执行流占有一个部分的操作叫做P操作。

        count++,一个执行流结束使用临界资源的一部风叫做V操作。

        当信号量count==0时,如果进行P操作,没有信号量可以分配了,此时会阻塞等待。

        由于信号量每一个线程看到的是同一份资源,信号量也是临界资源,要保证P,V操作是原子的。

        二元信号量相当于互斥锁: 二元信号量只有1个信号量,只要一个线程占有,信号量的值就等于0,其它线程就需要等待。

        1.4.3 理解信号量

        OS中会有很多的信号量,OS系统需要对它们进行管理,管理需要进行描述:

信号量可以描述为:

struct sem{

......

int count;//临界资源有效个数

mutex lock;//只允许一个线程对临界资源进行操作,需要上锁

wait_queue *head;//等待队列

......

}

        1.4.4 信号量的函数

  • 初始化
#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);
作用:初始化信号量
参数:
    sem,要初始化的信号量
    pshared:0表示线程间共享,非0表示进程间共享
    value:信号量初始值,信号量个数
  • 销毁信号量
#include <semaphore.h>

int sem_destroy(sem_t *sem);
作用:销毁定义的信号量
参数:
    sem:要销毁的信号量
  • 等待信号量,P操作
 #include <semaphore.h>

int sem_wait(sem_t *sem);
作用:等待信号量,将信号量的值减1,如果信号量为0,阻塞等待
参数:
    sem:要等待的信号量

  • 发布信号,V操作
#include <semaphore.h>
int sem_post(sem_t *sem);

作用:表示资源使用完毕,将信号量做加1操作
参数:
    sem:要发布的信号量

         1.4.5 基于环形队列实现生产者消费者模型

  • 环形队列采用数组模拟,用模运算来模拟环形特征
  • 当队列满了或者队列为空时,都是消费者的下标和生产者的下标相同。不好判断为空和满的情况。
    • 有两种方法:
    • 1.少用一个元素空间,这个时候为空时,下标相等,为满时,生产者下标加1在取模等于消费者下标。
    • 2.增加一个计数器,来记录元素个数。
  • 我们这里正好有信号量这个计数器,队列里的每一个位置代表一个信号量。正好信号量就是这个计数器。

定义两个信号量,一个信号量表示空格字space_sem,一个信号量表示数据_data_sem。

生产者:放元素,关注的说空格子这个信号量。

伪代码:

        P(space_sem)

        生产数据

        V(data_sem)

消费者:拿元素,关注的是数据这个信号量。 

伪代码:

        P(data_sem)

        生产数据

        V(space_sem)

        执行到同一位置时,为空或者满,此时要不就是space_sem为临界资源总有效个数,data_sem为0,要不就是data_sem为临界资源总有效个数,space_sem为0。这个时候,放数据和拿数据总会有一个在等待(P操作)。

        当生产者快,消费者慢时,一开始生产者将数据放满,在消费者消费一个,在生产者生产一个。队列经常是满的。

        当生产者,消费者快时,一开始没数据,需要生产者生产,在消费一个,现象时生产一个消费一个,队列经常是空的。

多消费者多生产者:

#pragma once 
#include<iostream>
#include<vector>
#include<semaphore.h>
#include<pthread.h>
#define NUM 5

class RingQueue{
  private:
    void P(sem_t& s){
      //信号量减减操作,如果为0等待
      sem_wait(&s);
    }
    void V(sem_t& s){
      //信号量加加操作
      sem_post(&s);
    }
  public:
    RingQueue(size_t cap = NUM)
      :_v(cap)
      ,_cap(cap)
      ,_cindex(0)
      ,_pindex(0)
      {
        sem_init(&_space_sem, 0, cap);
        sem_init(&_data_sem, 0, 0);
      }

    void Put(const int& in){
      //生产者关注格子数
      P(_space_sem);
      _v[_pindex]=in;
      _pindex++;
      _pindex %= _cap;
      V(_data_sem);

    }
    void Get(int& out){
      //消费者关注数据
      P(_data_sem);
      out = _v[_cindex];
      _cindex++;
      _cindex %= _cap;
      V(_space_sem);

    }

    ~RingQueue(){
      sem_destroy(&_space_sem);
      sem_destroy(&_data_sem);
      _cindex = 0;
      _pindex = 0;
    }

  private:
    std::vector<int> _v;//队列
    size_t _cap;//队列容量
    sem_t _space_sem;//格子信号量
    sem_t _data_sem;//数据信号量
    
    int _cindex;//消费者位置
    int _pindex;//生产者位置

};
#include"RingQueue.hpp"
#include<unistd.h>

using namespace std;

#define CON 4
#define PRO 4

pthread_mutex_t mutex1;
pthread_mutex_t mutex2;
void *consumer(void *arg){
  RingQueue *rq=(RingQueue *)arg;
  while(1){
    sleep(1);
    int x=0;
    pthread_mutex_lock(&mutex1);
    rq->Get(x);
    pthread_mutex_unlock(&mutex1);
    cout<<pthread_self()<<":"<<"consumer get a data :"<<x<<endl;

  }
}

void *productor(void *arg){
  RingQueue *rq=(RingQueue *)arg;
  while(1){
    //sleep(1);
    int x=rand()%10+1;
    pthread_mutex_lock(&mutex2);
    rq->Put(x);
    pthread_mutex_unlock(&mutex2);
    cout<<pthread_self()<<":"<<"productor put a data :"<<x<<endl;

  }
}
int main(){
  RingQueue *rq = new RingQueue();
  pthread_t td1[CON];
  pthread_t td2[PRO];
  int i=0;
  for(; i<CON; i++){
    pthread_create(td1+i, nullptr, consumer, (void *)rq);
  }
  for(i=0; i<PRO; i++){
    pthread_create(td2+i, nullptr, productor, (void *)rq);
  }
  

  for(i=0; i<CON; i++){
    pthread_join(td1[i], nullptr);
  }
  for(i=0; i<PRO; i++){
    pthread_join(td2[i], nullptr);
  }
  delete rq;

  return 0;
}

以上是关于Linux多线程——生产者消费者模型的主要内容,如果未能解决你的问题,请参考以下文章

生产者消费者模型

Linux多线程_(Posix信号量实现环形队列生产者消费者模型)

Linux多线程——生产者消费者模型

Linux入门多线程(线程概念生产者消费者模型消息队列线程池)万字解说

Linux多线程_(线程同步,基于阻塞队列的生产者消费者模型)

万字详解Linux系列多线程(下)