Linux__生产者消费者模型
Posted Y—X
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux__生产者消费者模型相关的知识,希望对你有一定的参考价值。
1.生产者与消费者模型
1.1基于阻塞队列的生产者与消费者模型
321原则:
3: 三种关系,生产者VS生产者(互斥),生产者VS消费者(同步),消费者VS消费者(互斥)
2: 两种角色,生产者和消费者
1: 一个交易场所(这里就是一个线程安全的BlockQueue队列)
优点
- 可以解耦合:生产者和消费者都是通过队列进行交互。
- 支持忙闲不均:队列起到了缓冲的作用。
- 支持并发:生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。
实现
用STL中的queue队列,一个生产者和一个消费者。
#ifndef __Queue_BLOCK_H_
#define __Queue_BLOCK_H_
#include <iostream>
#include <unistd.h>
#include <queue>
#include <pthread.h>
class Task
public:
int x;
int y;
public:
Task()
Task(int _x, int _y)
:x(_x)
,y(_y)
int Run()
return x + y;
~Task()
;
class BlockQueue
private:
std::queue<Task> q;
size_t cap;
pthread_mutex_t lock;
pthread_cond_t c_cond;//消费者在该条件变量下等
pthread_cond_t p_cond;//生产者在该条件下等
public:
bool IsFull()
return q.size() >= cap;
bool IsEmpty()
return q.empty();
void LockQueue()
pthread_mutex_lock(&lock);
void UnLockQueue()
pthread_mutex_unlock(&lock);
void WakeUpConsumer()
std::cout << "wake up consumer ..." << std::endl;
pthread_cond_signal(&c_cond);
void WakeUpProductor()
std::cout << "wake up productor ..." << std::endl;
pthread_cond_signal(&p_cond);
void ConsumerWait()
std::cout << "consumer wait..." << std::endl;
pthread_cond_wait(&c_cond, &lock);
void Prodoctorwait()
std::cout << "productor wait ..." << std::endl;
pthread_cond_wait(&p_cond, &lock);//调用该函数,会自动释放lock,被唤醒时会重新拥有锁
public:
BlockQueue(size_t _cap)
:cap(_cap)
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&c_cond, nullptr);
pthread_cond_init(&p_cond, nullptr);
void Put(Task t)
LockQueue();
while(IsFull())//不能用if判断,防止挂起失败
WakeUpConsumer();
Prodoctorwait();
q.push(t);
UnLockQueue();
void Get(Task &t)
LockQueue();
while(IsEmpty())
WakeUpProductor();
ConsumerWait();
t = q.front();
q.pop();
UnLockQueue();
~BlockQueue()
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&c_cond);
pthread_cond_destroy(&p_cond);
;
#endif
#include "BlockQueue.hpp"
using namespace std;
void *consumer_run(void *arg)
BlockQueue *bq = (BlockQueue*)arg;
while(true)
Task t;
bq->Get(t);
cout << "consumer date is : " << t.x << "+" << t.y << " = " << t.Run() << endl;
sleep(1);
void *productor_run(void *arg)
BlockQueue *bq = (BlockQueue*)arg;
sleep(1);
while(true)
int x = rand()%10+1;
int y = rand()%100+1;
Task t(x, y);
bq->Put(t);
cout << "productor Task is :"<< x << "+" << y << " =? " << endl;
int main()
BlockQueue *bq = new BlockQueue(5);
pthread_t c,p;
pthread_create(&c, nullptr, consumer_run, (void*)bq);
pthread_create(&p, nullptr, productor_run, (void*)bq);
pthread_join(c,nullptr);
pthread_join(p, nullptr);
delete bq;
return 0;
2.POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
2.1 什么是信号量?
信号量是描述临界资源有效个数的计数器。
2.2为什么要有信号量?
临界资源可以看成多份,不冲突的,并行访问资源提高效率。
2.3信号量相关操作
初始化信号量:
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量:
int sem_destroy(sem_t *sem);
等待信号量:
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量:
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
3.基于环形队列的生产消费模型
- 队列,使用数组模拟一个环形队列,用模运算来模拟环状特性。
- (当前下标+1)%数组的容量
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。
#pragma once
#include <iostream>
#include <semaphore.h>
#include <unistd.h>
#include <vector>
#define NUM 10
class RingQueue
private:
std::vector<int> v;
int max_cap;
sem_t sem_data;//消费者
sem_t sem_blank;//生产者
int c_index;
int p_index;
public:
void P(sem_t &s)
sem_wait(&s);//申请资源
void V(sem_t &s)
sem_post(&s);//返回资源
public:
RingQueue(int _cap=NUM)
:max_cap(_cap)
,v(_cap)
sem_init(&sem_blank, 0, max_cap);
sem_init(&sem_data, 0, 0);
c_index = 0;
p_index = 0;
void Put(const int &in)
P(sem_blank);
//生产
v[p_index] = in;
p_index++;
p_index %= max_cap;
V(sem_data);
void Get(int &out)
P(sem_data);//申请数据资源
//消费
out = v[c_index];
c_index++;
c_index %= max_cap;
V(sem_blank);//返还格子
~RingQueue()
sem_destroy(&sem_blank);
sem_destroy(&sem_data);
c_index = 0;
p_index = 0;
;
#include "RingQueue.hpp"
void *consumer(void *arg)
RingQueue *rq = (RingQueue*)arg;
while(true)
sleep(1);
int data = 0;
rq->Get(data);
std::cout << "consumer done ...#" << data << std::endl;
void *productor(void *arg)
RingQueue *rq = (RingQueue*)arg;
int count = 100;
while(true)
sleep(2);
rq->Put(count);
count++;
if(count > 110)
count = 100;
std::cout << "productor done ..." << std::endl;
int main()
RingQueue *rq = new RingQueue();
pthread_t c,p;
pthread_create(&c, nullptr, consumer, (void*)rq);
pthread_create(&p, nullptr, productor, (void*)rq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete rq;
return 0;
以上是关于Linux__生产者消费者模型的主要内容,如果未能解决你的问题,请参考以下文章