Linux:详解多线程(线程同步信号量和生产者与消费者模型的实现)
Posted It‘s so simple
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux:详解多线程(线程同步信号量和生产者与消费者模型的实现)相关的知识,希望对你有一定的参考价值。
目录
本篇的内容和我之前写的Linux:详解多线程(线程安全、互斥和死锁)(二)是连在一起的,可以先看完这篇文章,再过来阅读本篇文章。
1. 同步
1.1 前言
我们在上一篇文章中实现了线程的互斥,保证了在一个线程对临界资源进行访问的时候,其他线程是不能对该临界资源进行任何操作的,但是这样还是有一些问题存在的,比如由于是循环的加锁解锁,这些临界资源又可能每次都会被同一个线程所拿到,而其他的线程获取不到该临界资源,这就造成了程序结果的不合理性。
举个例子来说把,假设现在桌子上有一碗饭(临界资源),同时呢,又存在两个人,一个人只负责吃饭(线程A),一个人只负责做饭(线程B),那么,如果在只是实现了互斥的情况下,即每次只能有一个人对这碗饭进行操作,那么就像刚刚上面的分析一样,又可能每次都是做饭的人一直做饭,而吃饭的人不会进行吃,那么碗里的饭就会一直增加。
但是这并不符合我们日常生活的逻辑,按理说,在碗里还有饭的时候,我们是不能够进行做饭的,因为只有一个碗,我们不可能将两份饭放到一个碗中,我们只能等吃饭的人把碗里的饭吃了之后,再将做的饭放入碗中,这样才就保证了吃饭做饭的合理性。那么,我们本篇文章要讲的同步就是用来解决这种问题的。
具体一点就是,同步的作用为:让多个执行流在访问临界资源的时候是合理访问的。
1.2 条件变量
我们通常使用条件变量来实现同步。
条件的变量的本质是一个PCB等待队列再加上一堆接口
PCB等待队列:当线程发现资源不可用的时候,就调用条件变量的等待接口,将自己放到PCB等待队列当中,等待被程序唤醒。
PS:我们一定要明白在多线程的情况下,各个线程是并行运行的。
1.2.1 条件变量的初始化
初始化条件变量也分为两种情况,静态初始化和动态初始化。
我们可以使用man pthread_cond_init
来查询相应的条件变量初始化。
① 静态初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER
和互斥锁一样,他也是一个宏定义。
同理,它是不用自己主动释放资源的,是会由操作系统自动回收的。
② 动态初始化
int pthread_cond_init(pthread_cond_t *restrict cond,\\
const pthread_condattr_t *restrict attr);
参数:
cond
:待初始化的"条件变量"的变量,一般情况下,均是传入一个pthread_cond_t
的类型变量的地址attr
:条件变量的属性,一般情况下都是传入一个NULL,采取默认的属性。
返回值:
- 0 : 表示成功
- <0:表示失败
条件变量接口的返回值均是如此,下面就不再对其进行声明了。
1.2.2 条件变量的等待接口
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex)
参数:
cond
:需要进行等待的条件变量的接口mutex
:是一个互斥锁变量,它的作用是如果一个执行流调用了该接口,就会将执行流对应的PCB放到参数为cond的PCB等待队列当中。
那么问题来了:
① 为什么需要互斥锁?
② 在调用该接口的时候,pthread_cond_wait
函数的实现逻辑?
③ 如果一个线程在等待的时候,当它被唤醒之后,需要做什么事情?
解答:
① 为什么需要互斥锁?
传递互斥锁的原因是需要在
pthread_cond_wait
函数内部进行解锁,在解锁之后,其他执行流就可以获得这把互斥锁。
否则,如果调用pthread_cond_wait
在进行等待的时候,不释放互斥锁,那么其他线程也不会获取到这个互斥锁,整个程序就无法正常的向前运行,换句话说就是我拿着锁进入了等待吃饭的队列中,而做饭的人需要获取我手中的这把锁从而进行做饭,但是现在它拿不到我手中的锁,因此,就会一直循环的申请拿锁却拿不到,程序就会一直卡住。
② 在调用该接口的时候,pthread_cond_wait
函数的实现逻辑?
有三个步骤:
① 放入PCB等待队列中
② 释放互斥锁
③ 等待被唤醒注意这里的①和②的顺序是不能乱的,如果为先释放互斥锁,那么有可能在线程A释放的一瞬间,并且还没有入队的时候,发生了线程切换,B拿到了这把互斥锁,然后B去进行吃饭,在吃完之后,进行解锁并通知PCB等待队列进行做饭,但此时PCB等待队列中并没有等待做饭的线程A,然后B再次拿到锁,发现碗中没有饭,然后就进入PCB等待队列中等待被唤醒,这个时候线程A被切换进来,然后线程A执行入队操作,最终线程A和线程B都在PCB等待队列中,程序也就无法正常的运行的了。因此,这里的顺序是不能乱的。
③ 如果一个线程在等待的时候,当它被唤醒之后,需要做什么事情?
会做两件事情:
① 先移出PCB等待队列
② 再抢互斥锁注意这里再抢锁的时候,若是抢到了,那么
pthread_cond_wait
函数也就返回了,若是没有抢到,该函数就不会返回,而是一直等待的抢锁。
扩展:条件变量的等待接口中也有pthread_cond_timedwait
,即带有超时时间的接口
1.2.3 条件变量的唤醒接口
① int pthread_cond_signal(pthread_cond_t *cond);
参数:
cond
:传入待唤醒的条件变量的地址
作用:
通知(唤醒)PCB等待队列中的线程,如果被通知(被唤醒)的线程接收到了,则就会从PCB等待队列中进行出队操作,正常的执行代码。若是PCB等待队列中有多个线程,则至少随机唤醒一个PCB等待队列中的线程。
② int pthread_cond_broadcast(pthread_cond_t *cond);
参数:
cond
:传入待唤醒的条件变量的地址
作用:
pthread_cond_broadcast
的作用和pthread_cond_signal
的作用基本一样,但有一点不同的是,如果PCB等待队列中有多个线程,则他会唤醒所有的PCB等待队列中的线程。
1.2.4 条件变量的销毁接口
int pthread_cond_destroy(pthread_cond_t *cond);
参数:
cond
:传入待销毁的条件变量的地址
1.2.5 条件变量的实战探索
现在我们来继续进行前言中提到的场景,有一个碗和两个人,首先我们应该保证互斥,即同一时间中只有一个人能对碗进行操作,其次我们还应该保证碗中资源的合理性,即碗中只能有0和1两种状态,0代表碗是空的,1代表碗是满的,这点我们使用条件变量来对其进行实现。
代码如下:
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#define PTHREADNUM 1
int bowl = 1;
pthread_mutex_t g_lock;
pthread_cond_t g_cond;
//只负责吃的线程
void* MyPthreadEat(void* arg)
{
while(1)
{
pthread_mutex_lock(&g_lock);
if(bowl < 1)
{
pthread_cond_wait(&g_cond,&g_lock);
}
//走到这说明碗中现在是有饭的
--bowl;
printf("i eat %d,i am %p\\n",bowl,pthread_self());
//在操作完之后,需要对负责做的PCB等待队列发出信号,
//唤醒正在等待通知的做的线程
pthread_cond_signal(&g_cond);
pthread_mutex_unlock(&g_lock);
}
return NULL;
}
//只负责做的线程
void* MyPthreadMake(void* arg)
{
while(1)
{
pthread_mutex_lock(&g_lock);
if(bowl >= 1)
{
pthread_cond_wait(&g_cond,&g_lock);
}
//走到这说明碗中现在是没饭的
++bowl;
printf("i make %d,i am %p\\n",bowl,pthread_self());
//在操作完之后,需要对负责吃的PCB等待队列发出信号,
//唤醒正在等待通知的吃的线程
pthread_cond_signal(&g_cond);
pthread_mutex_unlock(&g_lock);
}
return NULL;
}
int main()
{
pthread_mutex_init(&g_lock,NULL);
pthread_cond_init(&g_cond,NULL);
pthread_t tid_A[PTHREADNUM] , tid_B[PTHREADNUM];
for(int i = 0;i < PTHREADNUM; ++i)
{
int ret = pthread_create(&tid_A[i],NULL,MyPthreadEat,NULL);
if(ret < 0)
{
perror("pthread_create");
return 0;
}
ret = pthread_create(&tid_B[i],NULL,MyPthreadMake,NULL);
if(ret < 0)
{
perror("pthread_create");
return 0;
}
}
for(int i = 0; i < PTHREADNUM; ++i)
{
pthread_join(tid_A[i],NULL);
pthread_join(tid_B[i],NULL);
}
pthread_mutex_destroy(&g_lock);
pthread_cond_destroy(&g_cond);
return 0;
}
结果验证:
由于产生的结果过多,我们将其重定向到1.txt中进查看
当只有一个线程负责吃,一个线程负责做,这样做好像成功了,因为既实现了互斥,又保证了对碗资源访问的合理性,但是,若是有两个线程负责吃,两个线程负责做呢?
看看结果,发现结果又不对了,这只有一个碗,但是却出现了做了四碗饭的情况,也出现了吃了四碗饭的情况,甚至都有-1出现了,因此,这样做肯定是不对的,我们还需要对其进行改进。
我们看到,只有一个碗,但是却出现了不合理的情况,我们此时再回想一下,pthread_cond_wait
函数被唤醒之后,会做什么事情:① 先移出PCB等待队列,② 抢互斥锁。然后我们再来看看有pthread_cond_wait
函数的两个语句。
是了,假设你在吃完饭之后,唤醒等待做饭的线程,由于存在两个做饭的线程,可能有一个线程MakeA在PCB等待队列中,而另外一个MakeB阻塞在拿锁的阶段,当吃饭的线程刚释放掉这把互斥锁,MakeB就拿到了这把锁,然后做饭,最后释放锁(此时bowl = 1),由于是while循环,就会再上去进行抢锁,那么现在假设是MakeA抢到了这把互斥锁,由于没有对其进行判断,因此,MakeA会直接对碗中的资源进++操作,(此时bowl=2),这样就造成了资源的不合理性。
那么如何进行修改呢?我们只需将if
改为while
即可,让被唤醒后的线程再进行判断即可,这样就可以保证资源的合理性。
代码改进:
//只负责吃的线程
void* MyPthreadEat(void* arg)
{
while(1)
{
pthread_mutex_lock(&g_lock);
while(bowl < 1)
{
pthread_cond_wait(&g_cond,&g_lock);
}
//走到这说明碗中现在是有饭的
--bowl;
printf("i eat %d,i am %p\\n",bowl,pthread_self());
//在操作完之后,需要对负责做的PCB等待队列发出信号,
//唤醒正在等待通知的做的线程
pthread_cond_signal(&g_cond);
pthread_mutex_unlock(&g_lock);
}
return NULL;
}
//只负责做的线程
void* MyPthreadMake(void* arg)
{
while(1)
{
pthread_mutex_lock(&g_lock);
while(bowl >= 1)
{
pthread_cond_wait(&g_cond,&g_lock);
}
//走到这说明碗中现在是没有饭的
++bowl;
printf("i make %d,i am %p\\n",bowl,pthread_self());
//在操作完之后,需要对负责吃的PCB等待队列发出信号,
//唤醒正在等待通知的吃的线程
pthread_cond_signal(&g_cond);
pthread_mutex_unlock(&g_lock);
}
return NULL;
}
运行结果:
虽然说好像还有问题,但是我们能看见的是,至少没有再出现刚刚那种资源访问不合理的情况了,但是为什么会卡死呢?我们紧接着对其进行分析。
我们使用ps aux | grep xxx
的命令来查看当前进程的进程号,再使用pstack [pid]
的命名擦好看当前进程的调用堆栈信息
发现四个工作线程均阻塞在了pthread_cond_wait
函数内部。那这是为什么呢?
总结一下前面的我们可以发现,当我们在使用的时候只有一个条件变量,若是吃和做均只有一个线程那还好,每次唤醒,都可将互斥的那个线程唤醒,程序这样是没问题的。
但是若吃饭和做饭的人有多个的话,一个条件变量就会造成混乱,因为当你对碗操作完之后,你不知道唤醒的是PCB等待队列中的哪一个,(本次代码使用的是pthread_cond_signal
函数,当PCB等待队列中有多个等待的线程的时候,至少会随机唤醒一个)。
因此,可能会存在这样的一个情况:一个负责吃的线程正在吃饭,而剩余的其他线程均在PCB等待队列中进行等待,当该线程吃完之后,pthread_cond_signal
函数随机的唤醒了另一个负责吃的线程,然后该线程发现碗中没有饭,就又进入PCB等待队列等待被唤醒,由于没有唤醒负责做饭的线程出来做饭,因此,在外面的负责吃的线程均进入了PCB等待队列等待被唤醒,因此,四个线程均会被阻塞在pthread_cond_wait
函数里面。
解决办法:
① 使用
pthread_cond_broadcast
函数,该函数会将PCB等待队列中的所有等待线程一次性全部唤醒。
② 使用两个条件变量,一个条件变量的PCB等待队列中存放负责吃的线程,另一个条件变量存放负责做的线程。
解法①的验证:
将
pthread_cond_signal
函数改为pthread_cond_broadcast
函数
看一下程序的运行结果:
发现程序是正确的,结果也符合我们的预期,多个线程对临界资源的访问是合理的,并且也实现了互斥访问。但是这种方法尽量不要滥用,因为线程在PCB等待队列中是不会占用CPU的资源的,若每次都使用broadcast来唤醒所有的线程,那么对CPU的资源的消耗也就进一步提升了。
解法②的验证:
使用两个条件变量
看一下程序运行的结果:
这个结果也是符合我们预期的,但是这两种方法,我们经常使用的是第二种解决办法,因为这种方法逻辑清晰,易于理解。
真正没有问题的代码如下:
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#define PTHREADNUM 2
int bowl = 1;
pthread_mutex_t g_lock;
pthread_cond_t g_condMake;
pthread_cond_t g_condEat;
//只负责吃的线程
void* MyPthreadEat(void* arg)
{
while(1)
{
pthread_mutex_lock(&g_lock);
//如果当碗中没饭的时候,则将该线程放入到PCB等待队列中,
//等待做饭的人做饭并将其唤醒
while(bowl < 1)
{
pthread_cond_wait(&g_condEat,&g_lock);
}
//走到这说明碗中现在是有饭的
--bowl;
printf("i eat %d,i am %p\\n",bowl,pthread_self());
//在操作完之后,需要对负责做的PCB等待队列发出信号,
//唤醒正在等待通知的做的线程
//pthread_cond_broadcast(&g_cond);
pthread_cond_signal(&g_condMake);
pthread_mutex_unlock(&g_lock);
}
return NULL;
}
//只负责做的线程
void* MyPthreadMake(void* arg)
{
while(1)
{
pthread_mutex_lock(&g_lock);
//如果当碗中有饭的时候,则将该线程放入到PCB等待队列中,
//等待吃饭的人吃饭并将其唤醒
while(bowl >= 1)
{
pthread_cond_wait(&g_condMake,&g_lock);
}
//走到这说明碗中现在是没饭的
++bowl;
printf("i make %d,i am %p\\n",bowl,pthread_self());
//在操作完之后,需要对负责吃的PCB等待队列发出信号,
//唤醒正在等待通知的吃的线程
//pthread_cond_broadcast(&g_cond);
pthread_cond_signal(&g_condEat);
pthread_mutex_unlock(&g_lock);
}
return NULL;
}
int main()
{
pthread_mutex_init(&g_lock,NULL);
pthread_cond_init(&g_condEat,NULL);
pthread_cond_init(&g_condMake,NULL);
pthread_t tid_A[PTHREADNUM] , tid_B[PTHREADNUM];
for(int i = 0;i < PTHREADNUM; ++i)
{
int ret = pthread_create(&tid_A[i],NULL,MyPthreadEat,NULL);
if(ret < 0)
{
perror("pthread_create");
return 0;
}
ret = pthread_create(&tid_B[i],NULL,MyPthreadMake,NULL);
if(ret < 0)
{
perror("pthread_create");
return 0;
}
}
for(int i = 0; i < PTHREADNUM; ++i)
{
pthread_join(tid_A[i],NULL);
pthread_join(tid_B[i],NULL);
}
pthread_mutex_destroy(&g_lock);
pthread_cond_destroy(&g_condEat);
pthread_cond_destroy(&g_condMake);
return 0;
}
2. 信号量
2.1 信号量的相关概念
① 信号量本质上是一个PCB等待队列+计数器。
计数器:对资源的计数,会影响信号量的等待接口和发送接口的(唤醒接口)的逻辑。
简单来说,假设我们指定当前计算机资源有8份,并且目前均被占用,现在若有一个线程想要使用该计算机的资源,当调用信号量的等待接口时,信号量内部会自动的对计算机的资源进行判断(实际上就是对信号量中的计数器进行判断),若是资源数小于0,则就将其放入PCB等待队列中,若是资源数大于0,则直接分配相应资源。当对该资源操作完之后,就调用信号量的发送接口,对计数器进行加1,并且通知PCB等待队列中的线程出队获取资源。这就是使用信号量对临界资源操作的大概流程。
② 信号量和条件变量的差距就是:
条件变量就是需要程序员自己把握资源的数量,信号量就是会自己维护资源的数量,只需要在初始化信号量的时候,指定资源的数量。
③ 信号量既可以完成同步,也可以完成互斥
互斥:
将资源计数器的初始值设置为1,线程A若拿到信号量,则线程B一定拿不到。线程A拿到信号量,就会使信号量中计数器进行减一,当线程B拿信号量的时候,判断计数器中的值为0,就会进入到PCB的等待队列中,因此也就实现了互斥。
同步:
假设当前有一个读的线程,有一个写的线程,它们均要对一个数组进行操作,那么此时,我们就可以定义两个信号量,一个读信号量,用来表示当前数组是否可读,并且初始化计数器为0,表示目前是不可读的,因为数组中没有值;另一个信号量为表示当前数组能写多少的写信号量,并且计数器初始化为数组的长度,因为数组是空的。
那么该如何实现同步呢?当我们写的线程每往数组中写一个元素的时候,首先对计数器进行减1操作,其次再唤醒读的信号量,读的信号被唤醒之后,就会对计数器进行加1操作,然后读的线程不再阻塞,就可以正常读。
然后在读的时候,每读一个数,读信号的计数器就会减1变为0,然后读信号就阻塞掉,并且会唤醒写的信号量,对写的信号量的计数器进行加1操作。这样就实现了一个同步的特点。这里需要注意的是,当往数组中写的时候,写的下标pos = (pos + 1) % 数组的长度
我会在后面的实战代码中对该功能进行实现。
那么,这里有两个面试题:
① 当一个线程调用的发送接口之后,资源技术器对其进行加1操作,若此时加1操作之后的资源计数器的结果还是小于0,此时还需要通知PCB等待队列吗?
解答:是需要的,因为需要告知PCB等待队列中目前还是有PCB进行等待的。
② 当一个线程调用发送接口之后,资源技术器进行加1操作,若此时加1操作之后的资源计数器的结果还是大于0,那么,此时还需要通知PCB等待队列吗?
解答:不需要,若是当程序中还存在着资源,那么就不需要再去通知PCB等待队列了,因为并没有太大的意义。
2.2 信号量的接口
信号量的类型为:sem_t
信号量接口包含的头文件为:#include <semaphore.h>
2.2.1 信号量的初始化
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
sem
:待初始化的信号量的指针pshared
:用于进程间还是用于线程间。(线程间:0,进程间:非0)value
:初始化资源的数量(实际上就是对计数器的初始化)
2.2.2 信号量的等待接口
int sem_wait(sem_t* sem);
参数:
sem
:待要进行等待的信号量
作用:
- 会对当前资源计数器进行减1的操作
- 判断当前资源计数器的值是否大于0
① 若是大于0,则直接将返回。
② 若是小于0,则将线程放到PCB等待队列中,并阻塞起来。
扩展:信号量的等待接口中也有sem_timedwait
,即带有超时时间的接口和sem_trywait
,即非阻塞的等待接口。
2.2.3 信号量的发送接口
int sem_post(sem_t *sem);
参数:
sem
:待要进行唤醒的信号量
作用:
- 会对当前资源计数器进行加1的操作
- 判断当前资源计数器的值是否小于0
① 若是小于0,则通知PCB等待队列
② 若是大于0,则不会通知PCB等待队列
2.2.4 信号量的销毁接口
int sem_destroy(sem_t *sem);
参数:
sem
:待要进行销毁的信号量
3. 生产者与消费者模型
3.1 相关概念
首先我们需要知道要实现一个生产者与消费者模型,只需要执行123规则即可。
① 1 :代表一个线程安全的队列
- 队列:遵循先进先出的原则。
- 线程安全:当前队列在被其他线程操作的时候,出队操作和入队操作是保证原子性的。换句话说就是在同一时刻只有一个人能操作该队列,出队和入队时互斥的。
② 2:代表两种角色的线程
- 消费者线程:从线程安全队列中获取元素进行处理。
- 生产者线程:生产元素放到线程安全队列中进行处理。
③ 3:三种关系
- 消费者与消费者互斥
- 生产者与生产者互斥
- 消费者和生产资互斥加同步
①消费者在对资源进行操作的时候,生产者不能进行访问,这是实现了互斥。
②只有当生产者往线程安全的队列中生产元素的时候,消费者才可以从线程安全的队列中获取元素进行处理,这是实现了同步。
生产者与消费者的作用是:
① 支持忙闲不均,可以提高程序的运行效率。
② 队列中提供了一个缓冲区的作用,可以缓冲待要处理的元素。
3.2 使用条件变量和互斥锁实现生产者和消费者模型
首先要实现生产者与消费者模型,执行123规则即可。
因此,我们应该首先定义一个类来实现一个线程安全的队列,为了保证线程安全,我们应在成员变量中给出一个互斥锁变量。为了保证生产者和消费者对资源访问的合理性,我们定义两个条件变量,用来实现同步。
两种角色的线程我们在主线程中进行初始化。
三种关系我们使用一个互斥锁和两个条件变量即可实现。
并且,我们还可以规定,Push操作为生产者线程调用的接口,Pop操作为消费者线程调用的接口。
代码如下:
#include <unistd.h>
#include <pthread.h>
#include <iostream>
#include <queue>
using namespace std;
#define PTHREADNUM 4
//实现一个线程安全的队列
class SafeQueue
{
public:
//初始化队列
SafeQueue() : que_(),capacity_(10)
{
pthread_mutex_init(&lock_,NULL);
pthread_cond_init(&prod_,NULL);
pthread_cond_init(&cons_,NULL);
}
//销毁队列
~SafeQueue()
{
pthread_mutex_destroy(&lock_);
pthread_cond_destroy(&prod_);
pthread_cond_destroy(&cons_);
}
Linux多线程中互斥锁读写锁自旋锁条件变量信号量详解
Linux篇第十五篇——多线程(生产消费模型+POSIX信号量)