Linux:详解多线程(线程同步信号量和生产者与消费者模型的实现)

Posted It‘s so simple

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux:详解多线程(线程同步信号量和生产者与消费者模型的实现)相关的知识,希望对你有一定的参考价值。


本篇的内容和我之前写的Linux:详解多线程(线程安全、互斥和死锁)(二)是连在一起的,可以先看完这篇文章,再过来阅读本篇文章。

1. 同步

1.1 前言

我们在上一篇文章中实现了线程的互斥,保证了在一个线程对临界资源进行访问的时候,其他线程是不能对该临界资源进行任何操作的,但是这样还是有一些问题存在的,比如由于是循环的加锁解锁,这些临界资源又可能每次都会被同一个线程所拿到,而其他的线程获取不到该临界资源,这就造成了程序结果的不合理性。

举个例子来说把,假设现在桌子上有一碗饭(临界资源),同时呢,又存在两个人,一个人只负责吃饭(线程A),一个人只负责做饭(线程B),那么,如果在只是实现了互斥的情况下,即每次只能有一个人对这碗饭进行操作,那么就像刚刚上面的分析一样,又可能每次都是做饭的人一直做饭,而吃饭的人不会进行吃,那么碗里的饭就会一直增加。

但是这并不符合我们日常生活的逻辑,按理说,在碗里还有饭的时候,我们是不能够进行做饭的,因为只有一个碗,我们不可能将两份饭放到一个碗中,我们只能等吃饭的人把碗里的饭吃了之后,再将做的饭放入碗中,这样才就保证了吃饭做饭的合理性。那么,我们本篇文章要讲的同步就是用来解决这种问题的。

具体一点就是,同步的作用为:让多个执行流在访问临界资源的时候是合理访问的

1.2 条件变量

我们通常使用条件变量来实现同步。

条件的变量的本质是一个PCB等待队列再加上一堆接口

PCB等待队列:当线程发现资源不可用的时候,就调用条件变量的等待接口,将自己放到PCB等待队列当中,等待被程序唤醒。

PS:我们一定要明白在多线程的情况下,各个线程是并行运行的。

1.2.1 条件变量的初始化

初始化条件变量也分为两种情况,静态初始化动态初始化
我们可以使用man pthread_cond_init来查询相应的条件变量初始化。

静态初始化

pthread_cond_t cond = PTHREAD_COND_INITIALIZER

和互斥锁一样,他也是一个宏定义。
在这里插入图片描述
同理,它是不用自己主动释放资源的,是会由操作系统自动回收的。

动态初始化

int pthread_cond_init(pthread_cond_t *restrict cond,\\
const pthread_condattr_t *restrict attr);

参数:

  • cond:待初始化的"条件变量"的变量,一般情况下,均是传入一个pthread_cond_t的类型变量的地址
  • attr:条件变量的属性,一般情况下都是传入一个NULL,采取默认的属性。

返回值:

  • 0 : 表示成功
  • <0:表示失败

条件变量接口的返回值均是如此,下面就不再对其进行声明了。

1.2.2 条件变量的等待接口

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex)

参数:

  • cond:需要进行等待的条件变量的接口
  • mutex:是一个互斥锁变量,它的作用是如果一个执行流调用了该接口,就会将执行流对应的PCB放到参数为cond的PCB等待队列当中。

那么问题来了:

① 为什么需要互斥锁?
② 在调用该接口的时候,pthread_cond_wait函数的实现逻辑?
③ 如果一个线程在等待的时候,当它被唤醒之后,需要做什么事情?

解答:

① 为什么需要互斥锁?

传递互斥锁的原因是需要在pthread_cond_wait函数内部进行解锁,在解锁之后,其他执行流就可以获得这把互斥锁。
否则,如果调用pthread_cond_wait在进行等待的时候,不释放互斥锁,那么其他线程也不会获取到这个互斥锁,整个程序就无法正常的向前运行,换句话说就是我拿着锁进入了等待吃饭的队列中,而做饭的人需要获取我手中的这把锁从而进行做饭,但是现在它拿不到我手中的锁,因此,就会一直循环的申请拿锁却拿不到,程序就会一直卡住。

② 在调用该接口的时候,pthread_cond_wait函数的实现逻辑?

有三个步骤:

① 放入PCB等待队列中
② 释放互斥锁
③ 等待被唤醒

注意这里的①和②的顺序是不能乱的,如果为先释放互斥锁,那么有可能在线程A释放的一瞬间,并且还没有入队的时候,发生了线程切换,B拿到了这把互斥锁,然后B去进行吃饭,在吃完之后,进行解锁并通知PCB等待队列进行做饭,但此时PCB等待队列中并没有等待做饭的线程A,然后B再次拿到锁,发现碗中没有饭,然后就进入PCB等待队列中等待被唤醒,这个时候线程A被切换进来,然后线程A执行入队操作,最终线程A和线程B都在PCB等待队列中,程序也就无法正常的运行的了。因此,这里的顺序是不能乱的。

③ 如果一个线程在等待的时候,当它被唤醒之后,需要做什么事情?

会做两件事情:

① 先移出PCB等待队列
② 再抢互斥锁

注意这里再抢锁的时候,若是抢到了,那么pthread_cond_wait函数也就返回了,若是没有抢到,该函数就不会返回,而是一直等待的抢锁。

扩展:条件变量的等待接口中也有pthread_cond_timedwait,即带有超时时间的接口

1.2.3 条件变量的唤醒接口

int pthread_cond_signal(pthread_cond_t *cond);

参数:

  • cond:传入待唤醒的条件变量的地址

作用

通知(唤醒)PCB等待队列中的线程,如果被通知(被唤醒)的线程接收到了,则就会从PCB等待队列中进行出队操作,正常的执行代码。若是PCB等待队列中有多个线程,则至少随机唤醒一个PCB等待队列中的线程。

int pthread_cond_broadcast(pthread_cond_t *cond);

参数

  • cond:传入待唤醒的条件变量的地址

作用

pthread_cond_broadcast的作用和pthread_cond_signal的作用基本一样,但有一点不同的是,如果PCB等待队列中有多个线程,则他会唤醒所有的PCB等待队列中的线程。

1.2.4 条件变量的销毁接口

int pthread_cond_destroy(pthread_cond_t *cond);

参数

  • cond:传入待销毁的条件变量的地址

1.2.5 条件变量的实战探索

现在我们来继续进行前言中提到的场景,有一个碗和两个人,首先我们应该保证互斥,即同一时间中只有一个人能对碗进行操作,其次我们还应该保证碗中资源的合理性,即碗中只能有0和1两种状态,0代表碗是空的,1代表碗是满的,这点我们使用条件变量来对其进行实现。

代码如下:

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>

#define PTHREADNUM 1

int bowl = 1;

pthread_mutex_t g_lock;
pthread_cond_t g_cond;

//只负责吃的线程
void* MyPthreadEat(void* arg)
{
    while(1)
    {
        pthread_mutex_lock(&g_lock);

        if(bowl < 1)
        {
            pthread_cond_wait(&g_cond,&g_lock);
        }
        //走到这说明碗中现在是有饭的
        --bowl;
        printf("i eat %d,i am %p\\n",bowl,pthread_self());

        //在操作完之后,需要对负责做的PCB等待队列发出信号,
        //唤醒正在等待通知的做的线程
        pthread_cond_signal(&g_cond);
        pthread_mutex_unlock(&g_lock);
    }
    return NULL;
}

//只负责做的线程
void* MyPthreadMake(void* arg)
{
    while(1)
    {
        pthread_mutex_lock(&g_lock);

        if(bowl >= 1)
        {
            pthread_cond_wait(&g_cond,&g_lock);
        }
        //走到这说明碗中现在是没饭的
        ++bowl;
        printf("i make %d,i am %p\\n",bowl,pthread_self());

        //在操作完之后,需要对负责吃的PCB等待队列发出信号,
        //唤醒正在等待通知的吃的线程
        pthread_cond_signal(&g_cond);
        pthread_mutex_unlock(&g_lock);
    }
    
    return NULL;
}

int main()
{
    pthread_mutex_init(&g_lock,NULL);
    pthread_cond_init(&g_cond,NULL);

    pthread_t tid_A[PTHREADNUM] , tid_B[PTHREADNUM];

    for(int i = 0;i < PTHREADNUM; ++i)
    {
        int ret = pthread_create(&tid_A[i],NULL,MyPthreadEat,NULL);
        if(ret < 0)
        {
            perror("pthread_create");
            return 0;
        }
        ret = pthread_create(&tid_B[i],NULL,MyPthreadMake,NULL);
        if(ret < 0)
        {
            perror("pthread_create");
            return 0;
        }    
    }

    for(int i = 0; i < PTHREADNUM; ++i)
    {
        pthread_join(tid_A[i],NULL);
        pthread_join(tid_B[i],NULL);
    }
    pthread_mutex_destroy(&g_lock);
    pthread_cond_destroy(&g_cond);
    return 0;
}

结果验证:

由于产生的结果过多,我们将其重定向到1.txt中进查看
在这里插入图片描述
在这里插入图片描述
当只有一个线程负责吃,一个线程负责做,这样做好像成功了,因为既实现了互斥,又保证了对碗资源访问的合理性,但是,若是有两个线程负责吃,两个线程负责做呢?
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
看看结果,发现结果又不对了,这只有一个碗,但是却出现了做了四碗饭的情况,也出现了吃了四碗饭的情况,甚至都有-1出现了,因此,这样做肯定是不对的,我们还需要对其进行改进。

我们看到,只有一个碗,但是却出现了不合理的情况,我们此时再回想一下,pthread_cond_wait函数被唤醒之后,会做什么事情:① 先移出PCB等待队列,② 抢互斥锁。然后我们再来看看有pthread_cond_wait函数的两个语句。

在这里插入图片描述
在这里插入图片描述
是了,假设你在吃完饭之后,唤醒等待做饭的线程,由于存在两个做饭的线程,可能有一个线程MakeA在PCB等待队列中,而另外一个MakeB阻塞在拿锁的阶段,当吃饭的线程刚释放掉这把互斥锁,MakeB就拿到了这把锁,然后做饭,最后释放锁(此时bowl = 1),由于是while循环,就会再上去进行抢锁,那么现在假设是MakeA抢到了这把互斥锁,由于没有对其进行判断,因此,MakeA会直接对碗中的资源进++操作,(此时bowl=2),这样就造成了资源的不合理性。

那么如何进行修改呢?我们只需将if改为while即可,让被唤醒后的线程再进行判断即可,这样就可以保证资源的合理性。

代码改进

//只负责吃的线程
void* MyPthreadEat(void* arg)
{
    while(1)
    {
        pthread_mutex_lock(&g_lock);

        while(bowl < 1)
        {
            pthread_cond_wait(&g_cond,&g_lock);
        }
        //走到这说明碗中现在是有饭的
        --bowl;
        printf("i eat %d,i am %p\\n",bowl,pthread_self());

        //在操作完之后,需要对负责做的PCB等待队列发出信号,
        //唤醒正在等待通知的做的线程
        pthread_cond_signal(&g_cond);
        pthread_mutex_unlock(&g_lock);
    }
    return NULL;
}

//只负责做的线程
void* MyPthreadMake(void* arg)
{
    while(1)
    {
        pthread_mutex_lock(&g_lock);

        while(bowl >= 1)
        {
            pthread_cond_wait(&g_cond,&g_lock);
        }
        //走到这说明碗中现在是没有饭的
        ++bowl;
        printf("i make %d,i am %p\\n",bowl,pthread_self());

        //在操作完之后,需要对负责吃的PCB等待队列发出信号,
        //唤醒正在等待通知的吃的线程
        pthread_cond_signal(&g_cond);
        pthread_mutex_unlock(&g_lock);
    }
    
    return NULL;
}

运行结果

在这里插入图片描述
虽然说好像还有问题,但是我们能看见的是,至少没有再出现刚刚那种资源访问不合理的情况了,但是为什么会卡死呢?我们紧接着对其进行分析。

我们使用ps aux | grep xxx的命令来查看当前进程的进程号,再使用pstack [pid]的命名擦好看当前进程的调用堆栈信息
在这里插入图片描述
发现四个工作线程均阻塞在了pthread_cond_wait函数内部。那这是为什么呢?

总结一下前面的我们可以发现,当我们在使用的时候只有一个条件变量,若是吃和做均只有一个线程那还好,每次唤醒,都可将互斥的那个线程唤醒,程序这样是没问题的。

但是若吃饭和做饭的人有多个的话,一个条件变量就会造成混乱,因为当你对碗操作完之后,你不知道唤醒的是PCB等待队列中的哪一个,(本次代码使用的是pthread_cond_signal函数,当PCB等待队列中有多个等待的线程的时候,至少会随机唤醒一个)。

因此,可能会存在这样的一个情况:一个负责吃的线程正在吃饭,而剩余的其他线程均在PCB等待队列中进行等待,当该线程吃完之后,pthread_cond_signal函数随机的唤醒了另一个负责吃的线程,然后该线程发现碗中没有饭,就又进入PCB等待队列等待被唤醒,由于没有唤醒负责做饭的线程出来做饭,因此,在外面的负责吃的线程均进入了PCB等待队列等待被唤醒,因此,四个线程均会被阻塞在pthread_cond_wait函数里面。

解决办法:

① 使用pthread_cond_broadcast函数,该函数会将PCB等待队列中的所有等待线程一次性全部唤醒
② 使用两个条件变量,一个条件变量的PCB等待队列中存放负责吃的线程,另一个条件变量存放负责做的线程

解法①的验证

pthread_cond_signal函数改为pthread_cond_broadcast函数
在这里插入图片描述
看一下程序的运行结果:
在这里插入图片描述
发现程序是正确的,结果也符合我们的预期,多个线程对临界资源的访问是合理的,并且也实现了互斥访问。但是这种方法尽量不要滥用,因为线程在PCB等待队列中是不会占用CPU的资源的,若每次都使用broadcast来唤醒所有的线程,那么对CPU的资源的消耗也就进一步提升了。

解法②的验证

使用两个条件变量
在这里插入图片描述
看一下程序运行的结果:
在这里插入图片描述
这个结果也是符合我们预期的,但是这两种方法,我们经常使用的是第二种解决办法,因为这种方法逻辑清晰,易于理解。

真正没有问题的代码如下:

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>



#define PTHREADNUM 2

int bowl = 1;

pthread_mutex_t g_lock;
pthread_cond_t g_condMake;
pthread_cond_t g_condEat;

//只负责吃的线程
void* MyPthreadEat(void* arg)
{
    while(1)
    {
        pthread_mutex_lock(&g_lock);
        //如果当碗中没饭的时候,则将该线程放入到PCB等待队列中,
        //等待做饭的人做饭并将其唤醒
        while(bowl < 1)
        {
            pthread_cond_wait(&g_condEat,&g_lock);
        }
        //走到这说明碗中现在是有饭的
        --bowl;
        printf("i eat %d,i am %p\\n",bowl,pthread_self());

        //在操作完之后,需要对负责做的PCB等待队列发出信号,
        //唤醒正在等待通知的做的线程
        //pthread_cond_broadcast(&g_cond);
        pthread_cond_signal(&g_condMake);
        pthread_mutex_unlock(&g_lock);
    }
    return NULL;
}

//只负责做的线程
void* MyPthreadMake(void* arg)
{
    while(1)
    {
        pthread_mutex_lock(&g_lock);
        //如果当碗中有饭的时候,则将该线程放入到PCB等待队列中,
        //等待吃饭的人吃饭并将其唤醒
        while(bowl >= 1)
        {
            pthread_cond_wait(&g_condMake,&g_lock);
        }
        //走到这说明碗中现在是没饭的
        ++bowl;
        printf("i make %d,i am %p\\n",bowl,pthread_self());

        //在操作完之后,需要对负责吃的PCB等待队列发出信号,
        //唤醒正在等待通知的吃的线程
        //pthread_cond_broadcast(&g_cond);
        pthread_cond_signal(&g_condEat);
        pthread_mutex_unlock(&g_lock);
    }
    
    return NULL;
}

int main()
{
    pthread_mutex_init(&g_lock,NULL);
    pthread_cond_init(&g_condEat,NULL);
    pthread_cond_init(&g_condMake,NULL);

    pthread_t tid_A[PTHREADNUM] , tid_B[PTHREADNUM];

    for(int i = 0;i < PTHREADNUM; ++i)
    {
        int ret = pthread_create(&tid_A[i],NULL,MyPthreadEat,NULL);
        if(ret < 0)
        {
            perror("pthread_create");
            return 0;
        }
        ret = pthread_create(&tid_B[i],NULL,MyPthreadMake,NULL);
        if(ret < 0)
        {
            perror("pthread_create");
            return 0;
        }    
    }

    for(int i = 0; i < PTHREADNUM; ++i)
    {
        pthread_join(tid_A[i],NULL);
        pthread_join(tid_B[i],NULL);
    }
    pthread_mutex_destroy(&g_lock);
    pthread_cond_destroy(&g_condEat);
    pthread_cond_destroy(&g_condMake);
    return 0;
}

2. 信号量

2.1 信号量的相关概念

信号量本质上是一个PCB等待队列+计数器

计数器:对资源的计数,会影响信号量的等待接口和发送接口的(唤醒接口)的逻辑。

简单来说,假设我们指定当前计算机资源有8份,并且目前均被占用,现在若有一个线程想要使用该计算机的资源,当调用信号量的等待接口时,信号量内部会自动的对计算机的资源进行判断(实际上就是对信号量中的计数器进行判断),若是资源数小于0,则就将其放入PCB等待队列中,若是资源数大于0,则直接分配相应资源。当对该资源操作完之后,就调用信号量的发送接口,对计数器进行加1,并且通知PCB等待队列中的线程出队获取资源。这就是使用信号量对临界资源操作的大概流程。

② 信号量和条件变量的差距就是:

条件变量就是需要程序员自己把握资源的数量信号量就是会自己维护资源的数量,只需要在初始化信号量的时候,指定资源的数量

信号量既可以完成同步,也可以完成互斥

互斥:

将资源计数器的初始值设置为1,线程A若拿到信号量,则线程B一定拿不到。线程A拿到信号量,就会使信号量中计数器进行减一,当线程B拿信号量的时候,判断计数器中的值为0,就会进入到PCB的等待队列中,因此也就实现了互斥。

同步:

假设当前有一个读的线程,有一个写的线程,它们均要对一个数组进行操作,那么此时,我们就可以定义两个信号量,一个读信号量,用来表示当前数组是否可读,并且初始化计数器为0,表示目前是不可读的,因为数组中没有值;另一个信号量为表示当前数组能写多少的写信号量,并且计数器初始化为数组的长度,因为数组是空的。

那么该如何实现同步呢?当我们写的线程每往数组中写一个元素的时候,首先对计数器进行减1操作,其次再唤醒读的信号量,读的信号被唤醒之后,就会对计数器进行加1操作,然后读的线程不再阻塞,就可以正常读。

然后在读的时候,每读一个数,读信号的计数器就会减1变为0,然后读信号就阻塞掉,并且会唤醒写的信号量,对写的信号量的计数器进行加1操作。这样就实现了一个同步的特点。这里需要注意的是,当往数组中写的时候,写的下标pos = (pos + 1) % 数组的长度

我会在后面的实战代码中对该功能进行实现。

那么,这里有两个面试题:
① 当一个线程调用的发送接口之后,资源技术器对其进行加1操作,若此时加1操作之后的资源计数器的结果还是小于0,此时还需要通知PCB等待队列吗?

解答:是需要的,因为需要告知PCB等待队列中目前还是有PCB进行等待的。

② 当一个线程调用发送接口之后,资源技术器进行加1操作,若此时加1操作之后的资源计数器的结果还是大于0,那么,此时还需要通知PCB等待队列吗?

解答:不需要,若是当程序中还存在着资源,那么就不需要再去通知PCB等待队列了,因为并没有太大的意义。

2.2 信号量的接口

信号量的类型为:sem_t

信号量接口包含的头文件为:#include <semaphore.h>

2.2.1 信号量的初始化

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:

  • sem:待初始化的信号量的指针
  • pshared:用于进程间还是用于线程间。(线程间:0,进程间:非0)
  • value:初始化资源的数量(实际上就是对计数器的初始化)

2.2.2 信号量的等待接口

int sem_wait(sem_t* sem);

参数:

  • sem:待要进行等待的信号量

作用

  • 会对当前资源计数器进行减1的操作
  • 判断当前资源计数器的值是否大于0

① 若是大于0,则直接将返回。
② 若是小于0,则将线程放到PCB等待队列中,并阻塞起来。

扩展:信号量的等待接口中也有sem_timedwait,即带有超时时间的接口和sem_trywait,即非阻塞的等待接口。

2.2.3 信号量的发送接口

int sem_post(sem_t *sem);

参数:

  • sem:待要进行唤醒的信号量

作用

  • 会对当前资源计数器进行加1的操作
  • 判断当前资源计数器的值是否小于0

① 若是小于0,则通知PCB等待队列
② 若是大于0,则不会通知PCB等待队列

2.2.4 信号量的销毁接口

int sem_destroy(sem_t *sem);

参数:

  • sem:待要进行销毁的信号量

3. 生产者与消费者模型

3.1 相关概念

首先我们需要知道要实现一个生产者与消费者模型,只需要执行123规则即可

1代表一个线程安全的队列

  • 队列:遵循先进先出的原则
  • 线程安全:当前队列在被其他线程操作的时候,出队操作和入队操作是保证原子性的。换句话说就是在同一时刻只有一个人能操作该队列,出队和入队时互斥的。

2代表两种角色的线程

  • 消费者线程:从线程安全队列中获取元素进行处理。
  • 生产者线程:生产元素放到线程安全队列中进行处理。

3三种关系

  • 消费者与消费者互斥
  • 生产者与生产者互斥
  • 消费者和生产资互斥加同步
    ①消费者在对资源进行操作的时候,生产者不能进行访问,这是实现了互斥。
    ②只有当生产者往线程安全的队列中生产元素的时候,消费者才可以从线程安全的队列中获取元素进行处理,这是实现了同步。

生产者与消费者的作用是:

① 支持忙闲不均,可以提高程序的运行效率。
② 队列中提供了一个缓冲区的作用,可以缓冲待要处理的元素。

3.2 使用条件变量和互斥锁实现生产者和消费者模型

首先要实现生产者与消费者模型,执行123规则即可

因此,我们应该首先定义一个类来实现一个线程安全的队列,为了保证线程安全,我们应在成员变量中给出一个互斥锁变量。为了保证生产者和消费者对资源访问的合理性,我们定义两个条件变量,用来实现同步。

两种角色的线程我们在主线程中进行初始化。

三种关系我们使用一个互斥锁和两个条件变量即可实现。

并且,我们还可以规定,Push操作为生产者线程调用的接口,Pop操作为消费者线程调用的接口。

代码如下:

#include <unistd.h>
#include <pthread.h>
#include <iostream>
#include <queue>

using namespace std;

#define PTHREADNUM 4

//实现一个线程安全的队列
class SafeQueue
{
    public:
        //初始化队列
        SafeQueue() : que_(),capacity_(10)
        {
            pthread_mutex_init(&lock_,NULL);
            pthread_cond_init(&prod_,NULL);
            pthread_cond_init(&cons_,NULL);

        }
        //销毁队列
        ~SafeQueue()
        {
            pthread_mutex_destroy(&lock_);
            pthread_cond_destroy(&prod_);
            pthread_cond_destroy(&cons_);
        }

        Linux多线程中互斥锁读写锁自旋锁条件变量信号量详解

Linux篇第十五篇——多线程(生产消费模型+POSIX信号量)

Linux篇第十五篇——多线程(生产消费模型+POSIX信号量)

生产者消费者模型

Linux操作系统多线程

Linux 多线程