[Linux 高并发服务器]生产者与消费者模型

Posted 鱼竿钓鱼干

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Linux 高并发服务器]生产者与消费者模型相关的知识,希望对你有一定的参考价值。

[Linux 高并发服务器]生产者与消费者模型

文章概述

该文章为牛客网C++项目课程:Linux高并发服务器的个人笔记,记录了生产者与消费者模型的一些知识点

作者信息

NEFU 2020级 zsl
ID:fishingrod/鱼竿钓鱼干
Email:851892190@qq.com
欢迎各位引用此博客,引用时在显眼位置放置原文链接和作者基本信息

参考资料

感谢前辈们留下的优秀资料,从中学到很多,冒昧引用,如有冒犯可以私信或者在评论区下方指出

标题作者引用处
Linux高并发服务器牛客网贯穿全文以此为基础
生产者-消费者模型:理论讲解及实现(C++)HOracle生产与消费者模型概念

正文部分

原本acwing上学thrift框架(一个RPC框架)的时候听说过生产者与消费者模型,不过当时没学这些玩意也就迷迷糊糊的过去了。

生产者与消费者模型

生产者-消费者模型:理论讲解及实现(C++)
这篇博客讲的蛮好的,强烈建议看看,能回答下面几个问题即可


该图摘自HOcrale大佬的博客

  1. 什么是生产者-消费者模型
  2. 为什么要使用生产者-消费者模型
  3. 生产者-消费模型的特点
  4. 生产者-消费者模型的优点
  5. 生产者-消费者模型的应用场景
  6. 为什么要在生产者消费者中间设置缓冲区(不设置会有什么问题)

CODE

下面这些代码没有处理容器满的情况,只是处理了没货的情况

伪代码

// 创建一个互斥量
pthread_mutex_t mutex;

//容器(缓冲区)Pool

void * producer(void * arg) {
    while(1) {
        //加锁
        pthread_mutex_lock(&mutex);
        //容器里加货
		
		//解锁
        pthread_mutex_unlock(&mutex);
    }
	return NULL;
}

void * customer(void * arg) {

    while(1) {
      	//加锁
      	pthread_mutex_lock(&mutex);

        // 看有没有货
        if(head != NULL) {// 有货
            //消费
            pthread_mutex_unlock(&mutex);
            usleep(100);
        } else {// 没货          
            // 解锁
            pthread_mutex_unlock(&mutex);
        }
    }
    return  NULL;
}

版本一

/*
    生产者消费者模型(粗略的版本)
*/
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>

// 创建一个互斥量
pthread_mutex_t mutex;

struct Node{
    int num;
    struct Node *next;
};

// 头结点
struct Node * head = NULL;

void * producer(void * arg) {

    // 不断的创建新的节点,添加到链表中
    while(1) {
        pthread_mutex_lock(&mutex);
        struct Node * newNode = (struct Node *)malloc(sizeof(struct Node));
        newNode->next = head;
        head = newNode;
        newNode->num = rand() % 1000;
        printf("add node, num : %d, tid : %ld\\n", newNode->num, pthread_self());
        pthread_mutex_unlock(&mutex);
        usleep(100);
    }

    return NULL;
}

void * customer(void * arg) {

    while(1) {
        pthread_mutex_lock(&mutex);
        // 保存头结点的指针
        struct Node * tmp = head;

        // 判断是否有数据
        if(head != NULL) {
            // 有数据
            head = head->next;
            printf("del node, num : %d, tid : %ld\\n", tmp->num, pthread_self());
            free(tmp);
            pthread_mutex_unlock(&mutex);
            usleep(100);
        } else {
            // 没有数据
            pthread_mutex_unlock(&mutex);
        }
    }
    return  NULL;
}

int main() {

    pthread_mutex_init(&mutex, NULL);

    // 创建5个生产者线程,和5个消费者线程
    pthread_t ptids[5], ctids[5];

    for(int i = 0; i < 5; i++) {
        pthread_create(&ptids[i], NULL, producer, NULL);
        pthread_create(&ctids[i], NULL, customer, NULL);
    }

    for(int i = 0; i < 5; i++) {
        pthread_detach(ptids[i]);
        pthread_detach(ctids[i]);
    }

    while(1) {
        sleep(10);
    }

    pthread_mutex_destroy(&mutex);

    pthread_exit(NULL);

    return 0;
}

改进与优化

条件变量优化

在版本一中的代码中,消费者模型的代码如下

void * customer(void * arg) {

    while(1) {
        pthread_mutex_lock(&mutex);
        // 保存头结点的指针
        struct Node * tmp = head;

        // 判断是否有数据
        if(head != NULL) {
            // 有数据
            head = head->next;
            printf("del node, num : %d, tid : %ld\\n", tmp->num, pthread_self());
            free(tmp);
            pthread_mutex_unlock(&mutex);
            usleep(100);
        } else {
            // 没有数据
            pthread_mutex_unlock(&mutex);
        }
    }
    return  NULL;
}

可以发现,如果没有数据,那么消费者只是进行了单纯的解锁。
这样有可能会出现,锁老是被消费者解锁了又拿回去造成很多次没有必要的判断。
为了优化这点,我们考虑下面方案:
如果消费者发现没数据了,那就阻塞在那里,释放锁给生产者去生产,这样能避免多余的判断,条件变量就是干这个的。

在容器空的时候,告诉消费者线程:你要等到生产者线程生产出东西,并且把锁释放给他
而对于生产者线程:你生产完产品要告诉消费者线程可以开始消费了(唤醒)

/*
    条件变量的类型 pthread_cond_t
    int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
    int pthread_cond_destroy(pthread_cond_t *cond);
    int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
        - 等待,调用了该函数,线程会阻塞。
    int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
        - 等待多长时间,调用了这个函数,线程会阻塞,直到指定的时间结束。
    int pthread_cond_signal(pthread_cond_t *cond);
        - 唤醒一个或者多个等待的线程
    int pthread_cond_broadcast(pthread_cond_t *cond);
        - 唤醒所有的等待的线程
*/
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>

// 创建一个互斥量
pthread_mutex_t mutex;
// 创建条件变量
pthread_cond_t cond;

struct Node{
    int num;
    struct Node *next;
};

// 头结点
struct Node * head = NULL;

void * producer(void * arg) {

    // 不断的创建新的节点,添加到链表中
    while(1) {
        pthread_mutex_lock(&mutex);
        struct Node * newNode = (struct Node *)malloc(sizeof(struct Node));
        newNode->next = head;
        head = newNode;
        newNode->num = rand() % 1000;
        printf("add node, num : %d, tid : %ld\\n", newNode->num, pthread_self());
        
        // 只要生产了一个,就通知消费者消费
        pthread_cond_signal(&cond);

        pthread_mutex_unlock(&mutex);
        usleep(100);
    }

    return NULL;
}

void * customer(void * arg) {

    while(1) {
        pthread_mutex_lock(&mutex);
        // 保存头结点的指针
        struct Node * tmp = head;
        // 判断是否有数据
        if(head != NULL) {
            // 有数据
            head = head->next;
            printf("del node, num : %d, tid : %ld\\n", tmp->num, pthread_self());
            free(tmp);
            pthread_mutex_unlock(&mutex);
            usleep(100);
        } else {
            // 没有数据,需要等待
            // 当这个函数调用阻塞的时候,会对互斥锁进行解锁,当不阻塞的,继续向下执行,会重新加锁。
            pthread_cond_wait(&cond, &mutex);
            pthread_mutex_unlock(&mutex);
        }
    }
    return  NULL;
}

int main() {

    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&cond, NULL);

    // 创建5个生产者线程,和5个消费者线程
    pthread_t ptids[5], ctids[5];

    for(int i = 0; i < 5; i++) {
        pthread_create(&ptids[i], NULL, producer, NULL);
        pthread_create(&ctids[i], NULL, customer, NULL);
    }

    for(int i = 0; i < 5; i++) {
        pthread_detach(ptids[i]);
        pthread_detach(ctids[i]);
    }

    while(1) {
        sleep(10);
    }

    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);

    pthread_exit(NULL);

    return 0;
}

信号量实现

信号量应该对标互斥锁的一个玩意

参考这个知乎问题下的回答
作者ID是:胖子
不知道为什么没法直接设置回答的链接,所以只能这样了,下面是我个人理解

多线程并发的两个基本需求是同步和等待
同步:多线程共享同一个资源,同时访问和使用,线程之间需要同步协调,这可以使用互斥锁来解决

等待:多个线程之间有依赖关系就需要等待,比如生产者消费者模型里消费者要等待生产者产出数据才能进行消费。

解决等待最朴素的方法就是轮询(poll),每隔一段时间看一下情况
版本一的代码就是轮询,只不过没设置时间间隔,轮询的缺点是可能或做很多重复的判断。
为了解决这一问题,我们使用了条件变量来进行优化。让消费者线程在没有数据的时候阻塞在那里,等待生产者生产。

接下来谈谈信号量,我们可以认为信号量=互斥锁+条件标量,是一个可以实现同步和等待的机制。我们可以把信号量解释为当前可用公共资源数。想象一个停车场,车位数就是最大容量,当前剩余空车位数就是当前可用公共资源,进去一辆信号量-1,出来一辆信号量+1,如果没有空车位了那么信号量为0,线程阻塞。

/*
    信号量的类型 sem_t
    int sem_init(sem_t *sem, int pshared, unsigned int value);
        - 初始化信号量
        - 参数:
            - sem : 信号量变量的地址
            - pshared : 0 用在线程间 ,非0 用在进程间
            - value : 信号量中的值

    int sem_destroy(sem_t *sem);
        - 释放资源

    int sem_wait(sem_t *sem);
        - 对信号量加锁,调用一次对信号量的值-1,如果值为0,就阻塞

    int sem_trywait(sem_t *sem);

    int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
    int sem_post(sem_t *sem);
        - 对信号量解锁,调用一次对信号量的值+1

    int sem_getvalue(sem_t *sem, int *sval);

    sem_t psem;
    sem_t csem;
    init(psem, 0, 8);
    init(csem, 0, 0);

    producer() {
        sem_wait(&psem);
        sem_post(&csem)
    }

    customer() {
        sem_wait(&csem);
        sem_post(&psem)
    }

*/

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <semaphore.h>

// 创建一个互斥量
pthread_mutex_t mutex;
// 创建两个信号量
sem_t psem;
sem_t csem;

struct Node{
    int num;
    struct Node *next;
};

// 头结点
struct Node * head = NULL;

void * producer(void * arg) {

    // 不断的创建新的节点,添加到链表中
    while(1) {
        sem_wait(&psem);
        pthread_mutex_lock(&mutex);
        struct Node * newNode = (struct Node *)malloc(sizeof(struct Node));
        newNode->next = head;
        head = newNode;
        newNode->num = rand() % 1000;
        printf("add node, num : %d, tid : %ld\\n", newNode->num, pthread_self());
        pthread_mutex_unlock(&mutex);
        sem_post(&csem);
    }

    return NULL;
}

void * customer(void * arg) {

    while(1) {
        sem_wait(&csem);
        pthread_mutex_lock(&mutex);
        // 保存头结点的指针
        struct Node * tmp = head;
        head = head->next;
        printf("del node, num : %d, tid : %ld\\n", tmp->num, pthread_self());
        free(tmp);
        pthread_mutex_unlock(&mutex);
        sem_post(&psem);
       
    }
    return  NULL;
}

int main() {

    pthread_mutex_init(&mutex, NULL);
    sem_init(&psem, 0, 8);
    sem_init(&csem, 0, 0);

    // 创建5个生产者线程,和5个消费者线程
    pthread_t ptids[5], ctids[5];

    for(int i = 0; i < 5; i++) {
        pthread_create(&ptids[i], NULL, producer, NULL);
        pthread_create(&ctids[i], NULL, customer, NULL);
    }

    for(int i = 0; i < 5; i++) {
        pthread_detach(ptids[i]);
        pthread_detach(ctids[i]);
    }

    while(1) {
        sleep(10);
    }

    pthread_mutex_destroy(&mutex);

    pthread_exit(NULL);

    return 0;
}

以上是关于[Linux 高并发服务器]生产者与消费者模型的主要内容,如果未能解决你的问题,请参考以下文章

《重学Java高并发》线程与线程之间如何协作(父子线程如何优雅交互)

《重学Java高并发》线程与线程之间如何协作(父子线程如何优雅交互)

《重学Java高并发》同步转异步编程技巧与实战运用

《重学Java高并发》同步转异步编程技巧与实战运用

并发编程之多进程3 (生产者与消费者模型) 回调函数

生产消费者模式,并不是高并发模式