Linux__生产者消费者模型

Posted Y—X

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux__生产者消费者模型相关的知识,希望对你有一定的参考价值。

1.生产者与消费者模型

1.1基于阻塞队列的生产者与消费者模型

321原则:

3: 三种关系,生产者VS生产者(互斥),生产者VS消费者(同步),消费者VS消费者(互斥)
2: 两种角色,生产者和消费者
1: 一个交易场所(这里就是一个线程安全的BlockQueue队列)

优点

  1. 可以解耦合:生产者和消费者都是通过队列进行交互。
  2. 支持忙闲不均:队列起到了缓冲的作用。
  3. 支持并发:生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。

实现


用STL中的queue队列,一个生产者和一个消费者。

#ifndef __Queue_BLOCK_H_
#define __Queue_BLOCK_H_ 

#include <iostream>
#include <unistd.h>
#include <queue>
#include <pthread.h>

class Task

  public:
    int x;
    int y;
  public:
    Task()
    

    Task(int _x, int _y)
      :x(_x)
      ,y(_y)
  

    int Run()
    
      return x + y;
    
    ~Task()
    
;

class BlockQueue

  private:
    std::queue<Task> q;
    size_t cap;
    pthread_mutex_t lock;
    pthread_cond_t c_cond;//消费者在该条件变量下等
    pthread_cond_t p_cond;//生产者在该条件下等
  public:
    bool IsFull()
    
      return q.size() >= cap;
    
    
    bool IsEmpty()
    
      return q.empty();
    
    
    void LockQueue()
    
      pthread_mutex_lock(&lock);
    
    
    void UnLockQueue()
    
      pthread_mutex_unlock(&lock);
    

    void WakeUpConsumer()
    
      std::cout << "wake up consumer ..." << std::endl;
      pthread_cond_signal(&c_cond);
    

    void WakeUpProductor()
    
      std::cout << "wake up productor ..." << std::endl;
      pthread_cond_signal(&p_cond);
    

    void ConsumerWait()
    
      std::cout << "consumer wait..." << std::endl;
      pthread_cond_wait(&c_cond, &lock);
    

    void Prodoctorwait()
    
      std::cout << "productor wait ..." << std::endl;
      pthread_cond_wait(&p_cond, &lock);//调用该函数,会自动释放lock,被唤醒时会重新拥有锁
    

  public:
    BlockQueue(size_t _cap)
      :cap(_cap)
    
      pthread_mutex_init(&lock, nullptr);
      pthread_cond_init(&c_cond, nullptr);
      pthread_cond_init(&p_cond, nullptr);

    
    
    void Put(Task t)
    
      LockQueue();
      while(IsFull())//不能用if判断,防止挂起失败
      
        WakeUpConsumer();
        Prodoctorwait();
      
      q.push(t);
      UnLockQueue();
    
    
    void Get(Task &t)
    
      LockQueue();
      while(IsEmpty())
      
        WakeUpProductor();
        ConsumerWait();
      
      t = q.front();
      q.pop();
      UnLockQueue();
    

    ~BlockQueue()
    
      pthread_mutex_destroy(&lock);
      pthread_cond_destroy(&c_cond);
      pthread_cond_destroy(&p_cond);
    

;

#endif 
#include "BlockQueue.hpp"

using namespace std;

void *consumer_run(void *arg)

  BlockQueue *bq = (BlockQueue*)arg;
  while(true)
  
    Task t;
    bq->Get(t);
    cout << "consumer date is : " << t.x << "+" << t.y << " = " << t.Run() << endl;
    sleep(1);
  


void *productor_run(void *arg)

  BlockQueue *bq = (BlockQueue*)arg;
  sleep(1);
  while(true)
  
    int x = rand()%10+1;
    int y = rand()%100+1;
    Task t(x, y);
    bq->Put(t);
    cout << "productor Task is :"<< x << "+" << y << " =? " << endl;
  



int main()

  BlockQueue *bq = new BlockQueue(5);
  pthread_t c,p;
  pthread_create(&c, nullptr, consumer_run, (void*)bq);
  pthread_create(&p, nullptr, productor_run, (void*)bq);

  pthread_join(c,nullptr);
  pthread_join(p, nullptr);

  delete bq;
  return 0;


2.POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

2.1 什么是信号量?

信号量是描述临界资源有效个数的计数器。

2.2为什么要有信号量?

临界资源可以看成多份,不冲突的,并行访问资源提高效率。

2.3信号量相关操作

初始化信号量:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
	pshared:0表示线程间共享,非零表示进程间共享
	value:信号量初始值

销毁信号量:

int sem_destroy(sem_t *sem);

等待信号量:

功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()

发布信号量:

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1int sem_post(sem_t *sem);//V()

3.基于环形队列的生产消费模型

  • 队列,使用数组模拟一个环形队列,用模运算来模拟环状特性。
  • (当前下标+1)%数组的容量

环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。

#pragma once 

#include <iostream>
#include <semaphore.h>
#include <unistd.h>
#include <vector>

#define NUM 10

class RingQueue 

  private:
    std::vector<int> v;
    int max_cap;
    sem_t sem_data;//消费者
    sem_t sem_blank;//生产者

    int c_index;
    int p_index;
  public:
    void P(sem_t &s)
    
      sem_wait(&s);//申请资源
    

    void V(sem_t &s)
    
      sem_post(&s);//返回资源
    

  public:
    RingQueue(int _cap=NUM)
      :max_cap(_cap)
      ,v(_cap)
    
      sem_init(&sem_blank, 0, max_cap);
      sem_init(&sem_data, 0, 0);
      c_index = 0;
      p_index = 0;
    

    void Put(const int &in)
    
      P(sem_blank);
      //生产
      v[p_index] = in;
      p_index++;
      p_index %= max_cap;
      V(sem_data);
    

    void Get(int &out)
    
      P(sem_data);//申请数据资源
      //消费
      out = v[c_index];
      c_index++;
      c_index %= max_cap;
      V(sem_blank);//返还格子
    

    ~RingQueue()
    
      sem_destroy(&sem_blank);
      sem_destroy(&sem_data);
      c_index = 0;
      p_index = 0;
    
;

#include "RingQueue.hpp"

void *consumer(void *arg)

  RingQueue *rq = (RingQueue*)arg;
  
  while(true)
  
    sleep(1);
    int data = 0;
    rq->Get(data);
    std::cout << "consumer done ...#" << data << std::endl;
  


void *productor(void *arg)

  RingQueue *rq = (RingQueue*)arg;
  int count = 100;
  while(true)
  
    sleep(2);
    rq->Put(count);
    count++;
    if(count > 110)
    
      count = 100;
    
    std::cout << "productor done ..." << std::endl;
  


int main()

  RingQueue *rq = new RingQueue();
  pthread_t c,p;
  pthread_create(&c, nullptr, consumer, (void*)rq);
  pthread_create(&p, nullptr, productor, (void*)rq);

  pthread_join(c, nullptr);
  pthread_join(p, nullptr);
  
  delete rq;
  return 0;


以上是关于Linux__生产者消费者模型的主要内容,如果未能解决你的问题,请参考以下文章

Linux系统编程5_条件变量与互斥锁

Linux__生产者消费者模型

Linux多线程_(Posix信号量实现环形队列生产者消费者模型)

死锁的几个例子(待完善)

操作系统_进程同步_李善平ppt

Linux信号量