Linux多线程_(线程同步,基于阻塞队列的生产者消费者模型)
Posted 楠c
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux多线程_(线程同步,基于阻塞队列的生产者消费者模型)相关的知识,希望对你有一定的参考价值。
目录
1. 线程同步的概念
在保证数据安全的情况下,让多个执行流,按照特定顺序对临界资源进行访问,叫做同步。
2. 为什么要有线程同步
线程互斥是为了,让访问临界资源不会出错。假如有一个竞争力强的线程一直执行任务,那么其他线程竞争不过他,虽然这个过程中也没有出错,但是效率变低,而线程同步则是当一个进程结束工作,通知下一个进程来执行,这样协同工作可以更高效的完成任务。
而这种通知事件,就是用条件变量完成的。
struct cond
{
int value;
wait_queue* head;
}
再来梳理一下学习的过程,由于多个线程访问临界资源,引起线程安全问题,所以引进了锁。由于锁,使得进程互斥,每次只有一个线程访问临界资源,但是如果这个线程的优先级比其他线程高,使得每次都是这个线程执行任务,其他线程竞争不过,就浪费掉了。所以又引进了条件变量。
2.1 条件变量初始化
2.2 销毁
2.3 等待
为什么等待的时候需要传入一个锁呢?
首先为什么要等呢,是因为条件不满足,怎么知道的呢?是经过判断,进入临界区才能判断,进入临界区就要加锁进入。因为条件不满足,所以要在这个函数调用时就必须释放锁,让其他线程申请锁做他的任务直到条件满足。那么当条件满足,函数执行返回在临界区,就必须要加上锁。
而整个释放锁,加锁过程在这个函数内完成。
例如后面模型中,生产者等的时候,释放锁,唤醒消费者。消费者申请锁,申请到了,消费,消费完之后,他等待,释放锁,唤醒生产者。生产者申请锁,生产,生产完之后,等待,释放锁,唤醒消费者。
2.4 唤醒等待
#include<stdio.h>
#include<unistd.h>
#include<pthread.h>
#include<math.h>
pthread_mutex_t lock;
pthread_cond_t cond;
void *routine_r1(void* args)
{
while(1)
{
pthread_cond_wait(&cond,&lock);
printf("%s get cond",(char*)args);
printf("活动中...\\n");
}
}
void* routine_r2(void* args)
{
while(1)
{
sleep(rand()%3+1);
pthread_cond_signal(&cond);
printf("%s signal \\n",(char*)args);
}
}
int main()
{
pthread_t t1,t2,t3,t4,t5;
pthread_mutex_init(&lock,NULL);
pthread_cond_init(&cond,NULL);
pthread_create(&t1,NULL,routine_r2,(void*)"thread 1");
pthread_create(&t2,NULL,routine_r1,(void*)"thread 2");
pthread_create(&t3,NULL,routine_r1,(void*)"thread 3");
pthread_create(&t4,NULL,routine_r1,(void*)"thread 4");
pthread_create(&t5,NULL,routine_r1,(void*)"thread 5");
pthread_join(t1,NULL);
pthread_join(t2,NULL);
pthread_join(t3,NULL);
pthread_join(t4,NULL);
pthread_join(t5,NULL);
return 0;
}
2,3,4,5号线程执行任务,执行任务时现在条件变量下等待,中控1号线程唤醒,开始执行任务。
3. 生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者,消费者:进程或者线程
交际场所:一个内存块(可以为链表,也可以是数组)
产品:数据
1个交易场所:内存
2个角色:生产者消费者
3个关系:
他们的主要关系就是这样。
第一保证他们不能出错。
- 生产者与生产者之间互斥。
生产者再往内存写数据的时候,另一个生产者不能写。 - 消费者与消费者之间互斥。
消费者取数据的时候,令一个消费者不能取 - 生产者与消费者之间互斥。同步。
生产者写数据的时候,消费者不能取。
消费者取数据的时候,生产者不能写。
生产者生产完,通知消费者来取。
消费者消费完,通知生产者生产。
第二保证提高效率。
- 生产者与生产者之间其实也要同步
存在多个生产者,不能让一个竞争力强的生产者,一直生产。 - 消费者与消费者之间其实也要同步
存在多个消费者,不能让一个竞争力强的消费者,一直消费。
3.1 基于阻塞队列的生产者消费者模型
首先阻塞队列有个最大值cap,生产者生产数据达到最大值,生产者等待,通知消费者开始消费,消费者消费完,等待,通知生产者生产。所以需要两个条件变量。
生产者生产数据,假如生产满了,唤醒消费者,然后自己在等待。
消费者消费数据,假如消光了,唤醒生产者,然后等待。
3.1.1 实验现象
当消费者慢,生产者快时。一瞬间生产满,消费者慢慢才消费。
当生产者慢时,消费者正常速度。
可以看到,生产者慢慢的再生产,满的时候一瞬间被消费。
至于上面那个生产一个被消费是因为,不确定哪个线程先执行,假如消费者先执行,那么就会直接挂起,假如生产者先执行,生产了1个,解锁,消费者竞争力强拿到锁进来了,他就先把那一个消费了。然后在准备消费,队列为空,挂起等待去了(在condwait中解锁,等待条件成立,生产者又能拿到锁)。只有生产者了,生产者申请锁,就一直生产直至队列满。在解锁。依次循环。
解决也很好解决,我们让生产者一开始就等待,那么消费者就先去,队列为空就等待去了。
至此,单消费者,单生产者。就完成了。
多生产者和多消费者,也不难想出,两个角色,两把锁,先让多个进程在外面组内竞争,反正进入的肯定是一个消费者,一个生产者,这两个的互斥关系在put和get中已经维护好了。还想多个进程同步的话,两个条件变量,也可以协同起来。
3.2 一个执行"任务"的应用场景
将它修改,传入自定义对象,是以一个任务结构体,可以执行加法
#pragma once
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<queue>
using namespace std;
class Task{
private:
int a;
int b;
public:
Task()
{}
Task(int a,int b)
:a(a)
,b(b)
{}
int run()
{
return a+b;
}
};
class BlockQueue
{
private:
size_t _cap;
queue<Task> q;
pthread_mutex_t lock;
pthread_cond_t p_cond;
pthread_cond_t c_cond;
public:
BlockQueue(size_t cap)
:_cap(cap)
{
pthread_mutex_init(&lock,NULL);
pthread_cond_init(&p_cond,NULL);
pthread_cond_init(&c_cond,NULL);
}
bool isFull()
{
return q.size()>=_cap;
}
void LockQueue()
{
pthread_mutex_lock(&lock);
}
void UnlockQueue()
{
pthread_mutex_unlock(&lock);
}
void WakeUpProductor()
{
pthread_cond_signal(&p_cond);
}
void WakeUpConsumer()
{
pthread_cond_signal(&c_cond);
}
void put(const Task& in)
{
LockQueue();
//假如wait失败的话,没有挂起,队列满了,还塞入了数据。
//if的话,挂起成功就挂起,失败就执行下面去了。虽然小概率事件但是仍要注意
//用while的话,wait失败,又重新进入循环,继续尝试等待,总不可能永远失败吧
while(isFull())
{
cout<<"Wake consumer"<<endl;
WakeUpConsumer();
cout<<"productor wait"<<endl;
pthread_cond_wait(&p_cond,&lock);
}
q.push(in);
UnlockQueue();
}
void get(Task& out)
{
LockQueue();
while(q.empty())
{
cout<<"wake up productor"<<endl;
WakeUpProductor();
cout<<"consumer wait"<<endl;
pthread_cond_wait(&c_cond,&lock);
}
out=q.front();
q.pop();
UnlockQueue();
}
~BlockQueue()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&p_cond);
pthread_cond_destroy(&c_cond);
}
};
#include"Block_Queue.hpp"
#include<stdlib.h>
void* productor_run(void* args)
{
BlockQueue* bq=(BlockQueue*)args;
sleep(1);
while(true)
{
sleep(1);
int x=rand()%10+1;
int y=rand()%100+1;
bq->put(Task(x,y));
cout<<"productor: x = "<< x <<" y = "<<y<<" x + y = ? "<<endl;
}
}
void* consumer_run(void* args)
{
BlockQueue* bq=(BlockQueue*)args;
while(true)
{
Task t;
bq->get(t);
cout<<"consume:"<<" x + y = "<<t.run()<< endl;
}
}
int main()
{
BlockQueue *Bq=new BlockQueue(8);
pthread_t p1,p2;
pthread_create(&p1,NULL,productor_run,(void*)Bq);
pthread_create(&p2,NULL,consumer_run,(void*)Bq);
pthread_join(p1,NULL);
pthread_join(p2,NULL);
delete Bq;
return 0;
}
3.2.1 实验现象
3.2.2 实际应用场景
- 管道也是一种经典的生产者消费者模型,它里面封装了同步与互斥。
- 前端网页,注册之后提交,通过网络发到服务器中,服务器保存在后端。线程1,从网络拿数据,保存数据至任务队列,线程2从任务队列取数据,将数据保存至数据库。
以上是关于Linux多线程_(线程同步,基于阻塞队列的生产者消费者模型)的主要内容,如果未能解决你的问题,请参考以下文章
Linux多线程_(Posix信号量实现环形队列生产者消费者模型)
(王道408考研操作系统)第二章进程管理-第三节7:经典同步问题之多生产者与多消费者问题