LinuxPOSIX信号量 | 基于环形队列的生产者消费者模型

Posted 阿亮joy.

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了LinuxPOSIX信号量 | 基于环形队列的生产者消费者模型相关的知识,希望对你有一定的参考价值。

​🌠 作者:@阿亮joy.
🎆专栏:《学会Linux》
🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根

目录

👉POSIX信号量👈

POSIX 信号量和 SystemV 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但 POSIX 可以用于线程间同步。信号量分为二元信号量和多远信号量。二元信号量(Binary Semaphore),即:计数器维护的 value 只有 0 和 1 着两种可能,以此来实现互斥,所以也称为互斥信号量(互斥锁)。1 表示可以访问资源,0 表示不可以访问资源。多元信号量是 value 大于 2 的信号量,以原子的方式对该值进行加加和减减操作。

深入理解信号量

初始化信号量

  • sem:是 sem_t 类型变量的地址,也就是信号量。
  • pshared:0 表示线程间共享,非零表示进程间共享。
  • value:信号量本质是一个计数器,value 就是信号量的初始值。

注:信号量本质是计数器,但不意味着只有计数器,信号量还包括一个等待队列。

销毁信号量

等待信号量

  • P 操作:等待信号量,会将信号量的值减 1。

发布信号量

  • V 操作:发布信号量,表示资源使用完毕,可以归还资源了并将信号量值加 1。

注:P 操作和 V 操作都是原子操作。

👉基于环形队列的生产者消费者模型👈

空间资源和数据资源

  • 生产者关注的是空间资源,只有当环形队列中有空间,生产者才能进行生产。消费者关注的是数据资源,只有当环形队列中有数据,消费者才能进行消费。
  • 通过信号量来描述环形队列中的空间资源(space_sem)和数据资源(data_sem)。space_sem 的初始值是为环形队列的容量上限,因为刚开始时环形队列当中全是空间。data_sem 的初始值是 0,因为刚开始环形队列中没有数据。

申请和释放资源

生产者申请空间资源,释放数据资源。

对于生产者来说,生产者每次生产数据前都需要申请 space_sem。

  • 如果 space_sem 不为 0,则申请信号量成功,生产者可以进行生产。
  • 如果 space_sem 为 0,则申请信号量失败,生产者需要在 space_sem 的等待队列中进行阻塞等待,直到环形队列中有新的空间资源,才能被唤醒。

当生产者生产数据后,应该释放 data_sem。

  • 虽然生产者在进行生产前是对 space_sem 进行的 P 操作,但是生产结束后,并不是对 space_sem 进行 V 操作,而是对 data_sem 进行 V 操作。
  • 当生产者生产完数据后,环形队列中就会多了一个数据资源,因此我们要对 data_sem 进行 V 操作。

消费者申请数据资源,释放空间资源。

对于消费者来说,消费者每次消费数据前都要申请 data_sem。

  • 如果 data_sem 不为 0,则申请信号量成功,消费者可以进行消费。
  • 如果 data_sem 为 0,则申请信号量失败,消费者需要在 data_sem 的等待队列中进行阻塞等待,直到环形队列中有新的数据资源,才能被唤醒。

当消费者消费完数据后,需要释放 space_sem。

  • 虽然消费者在进行消费前是对 data_sem 进行的 P 操作,但是消费结束后,并不是对 data_sem 进行 V 操作,而是对 space_sem 进行 V 操作。
  • 当消费者消费完数据后,环形队列中就会多了一个空间资源,因此我们要对 space_sem 进行 V 操作。

两个规则

在基于环形队列的生产者和消费者模型当中,生产者和消费者必须遵守两个规则。

第一个规则:生产者和消费者不能对同一个位置进行访问。

生产者和消费者对环形队列进行访问时

  • 如果生产者和消费者访问的是环形队列中的同一个位置,那么就相当于生产者和消费者同时对一块临界资源进行访问,这样将可能会出现数据不一致问题。
  • 如果生产者和消费者访问的是环形队列中的不同位置,那么生产者和消费就可以并发地进行生产数据和消费数据,并不会出现数据不一致的问题。

如果保证生产者和消费者不会对环形队列的同一个位置进行访问?

  • 当生产者和消费者将要对环形队列的同一个位置进行访问时,此时的环形队列要么为满,要么为空。
  • 当环形队列为满时,space_sem 为 0,生产者不能对环形队列进行访问,需要在 space_sem 的等待队列中进行阻塞等待。
  • 当环形队列为空时,data_sem 为 0,消费者不能对环形队列进行访问,需要在 data_sem 的等待队列中进行等待。
  • 通过信号量就保证了当生产者和消费者指向环形队列的同一个位置时,生产和消费的串行化过程。同时也保证了当生产者和消费者执行的不是同一个位置时,生产者和消费者可以并发地进行生产和消费,以提高效率。

第二规则:生产者不能将消费者套圈,消费者不能超过生产者。

  • 如果生产者将消费者套圈了,那么就会出现这样的情况:消费者还没有将生产者之前生产的数据消费掉,该数据就被覆盖掉了,这很显然是不允许的。所以当生产者生产了一圈后,再次遇到消费者时,生产者就不能再进行生产了,需要等消费者消费数据后,才能进行生产。
  • 如果消费者超过了生产者,那么就会出现这样的情况:消费者会将之前已经消费过的废弃数据再消费一次,这也是不允许的。所以当消费者消费一圈后,再次遇到生产者,消费者就不能再进行消费了,需要等生产者生产数据后,才能进行消费。
  • 很明显,第二个规则也是通过信号量来保证的。

代码实现

Sem 是对信号量的封装,其构造函数是对信号量进行初始化,析构函数是对信号量进行销毁,同时也将等待信号量和发布信号量分别封装成 P 操作和 V 操作。

// sem.hpp
#ifndef _SEM_HPP_
#define _SEM_HPP_

#include <semaphore.h>

class Sem

public:
    Sem(int value)
    
        sem_init(&_sem, 0, value);
    

    ~Sem()
    
        sem_destroy(&_sem);
    

    void P()
    
        sem_wait(&_sem);
    

    void V()
    
        sem_post(&_sem);
    

private:
    sem_t _sem;
;

#endif

环形队列 RingQueue 是生产者和消费者模型当中的交易场所,我们可以通过 STL 中的 vector 来模拟实现。

// ringQueue.hpp
#ifndef _RING_QUEUE_HPP_
#define _RING_QUEUE_HPP_

#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"

const int DefaultCapacity = 5;

template <class T>
class RingQueue

public:
    RingQueue(int cap = DefaultCapacity)
        : _rq(cap)
        , _cap(cap)
        , _p_pos(0)
        , _c_pos(0)
        , _space_sem(cap)
        , _data_sem(0)
    

    ~RingQueue()
    

    // 生产数据(生产者调用)
    void Push(const T& in)
    
        _space_sem.P();
        _rq[_p_pos++] = in;
        _p_pos %= _cap;
        _data_sem.V();
    

    // 消费数据(消费者调用)
    void Pop(T& out)
    
        _data_sem.P();
        out = _rq[_c_pos++];
        _c_pos %= _cap;
        _space_sem.V();
    

private:
    std::vector<T> _rq;
    int _cap;
    int _p_pos; // 生产位置
    int _c_pos; // 消费位置
    Sem _space_sem; // 空间资源
    Sem _data_sem;  // 数据资源
;

#endif

相关说明:

  • 当不指定环形队列的大小时,将会使用缺省值 DefaultCapacity 作为环形队列的容量上限。
  • 生产者每次生产时,会将生产的数据放在下标为 _p_pos 的位置上;消费者每次消费时,会取出下标为 _c_pos 的位置上的数据进行消费。
  • 生产者和消费者生产和消费结束后,需要对 _p_pos 和 _c_pos 进行加加操作,以标记下一次放入数据的位置和取数据的位置,最好还需要对下标进行模除操作,以达到环形效果。
  • _p_pos 只会由生产者线程更新,_c_pos 只会由消费者线程更新,所以对它们的访问不需要进行保护,因此它们的更新操作可以放在 V 操作之后。

单生产者单消费者

// Test.cc
#include <iostream>
#include <ctime>
#include <unistd.h>
#include "ringQueue.hpp"

void* Consumer(void* args)

    RingQueue<int>* rq = (RingQueue<int>*)(args);

    while(true)
    
        int data;
        rq->Pop(data);
        std::cout << "Comsumer: " << data << std::endl;
    

    return nullptr;


void* Productor(void* args)

    RingQueue<int>* rq = (RingQueue<int>*)(args);

    while(true)
    
        sleep(1);
        int data = rand() % 100 + 1;
        rq->Push(data);
        std::cout << "Productor: " << data << std::endl;
    

    return nullptr;


int main()

    srand((unsigned int)time(nullptr));

    RingQueue<int>* rq = new RingQueue<int>();
    pthread_t c, p;
    pthread_create(&c, nullptr, Consumer, (void*)rq);
    pthread_create(&p, nullptr, Productor, (void*)rq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    delete rq;

    return 0;


多生产者多消费者

有了多个生产者和多消费者,就会存在生产者和生产者之间的竞争关系、消费者和消费者之间的竞争关系,那么我们就需要对临界资源(下标)进行加锁保护。

// RingQueue.hpp
#ifndef _RING_QUEUE_HPP_
#define _RING_QUEUE_HPP_

#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"

const int DefaultCapacity = 5;

template <class T>
class RingQueue

public:
    RingQueue(int cap = DefaultCapacity)
        : _rq(cap)
        , _cap(cap)
        , _p_pos(0)
        , _c_pos(0)
        , _space_sem(cap)
        , _data_sem(0)
    
        pthread_mutex_init(&_plock, nullptr);
        pthread_mutex_init(&_clock, nullptr);
    

    ~RingQueue()
    
        pthread_mutex_destroy(&_plock);
        pthread_mutex_destroy(&_clock);
    

    // 生产数据(生产者调用)
    void Push(const T& in)
    
        // 先申请信号量:先将资源分发给线程
        _space_sem.P();
        pthread_mutex_lock(&_plock);
        // 一定是竞争成功的生产者线程 -- 就一个!
        _rq[_p_pos++] = in;
        _p_pos %= _cap;
        pthread_mutex_unlock(&_plock);
        _data_sem.V();
    

    // 消费数据(消费者调用)
    void Pop(T& out)
    
        _data_sem.P();
        pthread_mutex_lock(&_clock);
        // 一定是竞争成功的消费者线程 -- 就一个!
        out = _rq[_c_pos++];
        _c_pos %= _cap;
        pthread_mutex_unlock(&_clock);
        _space_sem.V();
    

private:
    std::vector<T> _rq;
    int _cap;
    int _p_pos; // 生产位置
    int _c_pos; // 消费位置
    Sem _space_sem; // 空间资源
    Sem _data_sem;  // 数据资源
    pthread_mutex_t _plock;
    pthread_mutex_t _clock;
;

#endif

注意:_p_pos 和 c_pos 的更新需要再加锁和解锁之间。如果它们的更新不在加锁和解锁之间,将可能会出现这样的情况:线程 A 释放了锁彬没来得及将下标进行更新,然后线程 B 就获得了锁并执行到更新下标的地方,这样就有可能会出现数据不一致的问题!

// Test.cc
#include <iostream>
#include <ctime>
#include <unistd.h>
#include "ringQueue.hpp"

void* Consumer(void* args)

    RingQueue<int>* rq = (RingQueue<int>*)(args);

    while(true)
    
        sleep(1);
        int data;
        rq->Pop(data);
        std::cout << "Comsumer: " << data << " [" << pthread_self() << "]" << std::endl;
    

    return nullptr;


void* Productor(void* args)

    RingQueue<int>* rq = (RingQueue<int>*)(args);

    while(true)
    
        int data = rand() % 100 + 1;
        rq->Push(data);
        std::cout << "Productor: " << data << " [" << pthread_self() << "]" << std::endl;
    

    return nullptr;


int main()

    srand((unsigned int)time(nullptr));

    RingQueue<int>* rq = new RingQueue<int>();
    pthread_t c[3], p[2];
    pthread_create(c, nullptr, Consumer, (void*)rq);
    pthread_create(c+1, nullptr, Consumer, (void*)rq);
    pthread_create(c+2, nullptr, Consumer, (void*)rq);

    pthread_create(p, nullptr, Productor, (void*)rq);
    pthread_create(p+1, nullptr, Productor, (void*)rq);

    for(int i = 0; i < 3; i++) pthread_join(c[i], nullptr);
    for(int i = 0; i < 2; i++) pthread_join(p[i], nullptr);
    delete rq;

    return 0;

多生产多消费的意义

生产的本质是将私有的任务或数据放入到公共空间中,消费的本质是将公共空间中的任务或数据变成私有。生产和消费并不是简单地将任务或数据放入到交易场所或从交易场所中取出,还需要考虑数据或任务放入到交易场所前和拿到任务或数据之后的处理,这两个过程是最耗费时间的。所以,多生产多消费的意义就在于能够并发地生产和处理任务。

信号量的意义

信号量本质是一个计数器,那这个计数器的意义是什么呢?计数器的意义就是不用进入临界区,就可以得知临界资源的情况,甚至可以减少临界区内部的判断!而在基于阻塞队列的生产者和消费者模型中,需要申请锁,然后进行临界资源是否满足的判断再进行访问,最后释放锁,需要进行判断的原因就是我们并不清楚临界资源的情况!而信号量要提前预设资源的情况,在进行 PV 操作的过程中,我们在临界区外部就能得知临界资源的情况。

👉总结👈

本篇博客主要讲解了信号量、信号量的相关函数以及基于环形队列的生产者消费者模型等。那么以上就是本篇博客的全部内容了,如果大家觉得有收获的话,可以点个三连支持一下!谢谢大家!💖💝❣️

基于信号量与环形队列实现读写异步缓存队列

目录

一、需求 

二、技术思路

三、示例代码


一、需求 

         实现一个读写异步的缓存队列,主要实现功能如下:

        1、缓存队列作为临界资源,要是线程安全的,不会出现读线程与写线程同时操作缓存队列的情况发生。

        2、当缓存队列被塞满以后,写线程阻塞等待读线程读取数据。

        3、当缓存队列空时,读线程需要阻塞等待写线程写入数据。

        4、可指定缓存队列的长度,缓存队列中,存放 byte 类型的数据。

二、技术思路

        基于信号量与环形队列实现。

        参考《现代操作系统》,使用了三个信号量:一个称为full,用来记录充满的缓冲槽数目;一个称为empty,记录空的缓冲槽总数;一个称为mutex,用来确保生产者和消费者不会同时访问缓冲区。full的初值为0,empty的初值为缓冲区中槽的数目,mutex初值为1。供两个或多个进程使用的信号量,其初值为1,保证同时只有一个进程可以进入临界区 。

        环形队列是为了能够重复利用队列中的空间。当读数据或写数据抵达队列边界时,能够跳到开头衔接上。由于已经使用信号量做队列长度的约束,所以在使用环形队列时可以省不少事,只需要实现首尾跳转即可,无需关心队头与队尾位置的问题。

三、示例代码

        仅作参考,未贴头文件。

queue.c 

#include "queue.h"

typedef struct shared_queue

    sem_t queue_full;
    sem_t queue_empty;
    sem_t queue_lock;

    item *queue;
    int queue_len;
    // ring queue
    int tail;
    int font;

sharq;

int queue_init(sharq *que, int len)

    if (que == NULL)
    
        printf("[error] : the pointer of queue is NULL\\n");
        return ERROR;
    
    que->queue = (item *)malloc(sizeof(item) * len);
    if (que->queue == NULL)
    
        printf("[error] : queue malloc failed\\n");
        return ERROR;
    

    que->queue_len = len;
    que->tail = 0;
    que->font = 0;

    sem_init(&que->queue_full, 0, 0);
    sem_init(&que->queue_empty, 0, len);
    sem_init(&que->queue_lock, 0, 1);

    return OK; 


int queue_write(sharq *que, byte *input, int len_in)

    sem_wait(&que->queue_empty);
    sem_wait(&que->queue_lock);

    // insert
    item *it = que->queue + que->tail;
    it->buff = (byte *)malloc(len_in);
    if (it->buff == NULL)
    
        printf("[error] : item malloc failed\\n");
        return ERROR;
    
    memcpy(it->buff, input, len_in);
    it->buff_len = len_in;

    // ring queue 
    que->tail++;
    que->tail %= que->queue_len;

    sem_post(&que->queue_lock);
    sem_post(&que->queue_full);
    return OK;


int queue_read(sharq *que, byte *output, int len_out)

    sem_wait(&que->queue_full);
    sem_wait(&que->queue_lock);

    // delete
    item *it = que->queue + que->font;
    if (len_out < it->buff_len)
    
        printf("[error] : length of output_buff is lower than item_buff\\n");
        return ERROR;
    
    memcpy(output, it->buff, it->buff_len); 

    // ring queue
    que->font++;
    que->font %= que->queue_len;

    sem_post(&que->queue_lock);
    sem_post(&que->queue_empty);
    return OK;


int queue_destroy(sharq *que)

    if (que == NULL)
    
        printf("[error] : can not destroy NULL queue\\n");
        return ERROR;
    
    sem_destroy(&que->queue_full);
    sem_destroy(&que->queue_empty);
    sem_destroy(&que->queue_lock);

    if (que->queue != NULL)
    
        free(que->queue);
    
    return OK;

main.c 

#include <stdio.h>
#include <pthread.h>
#include <string.h>
#include <strings.h>
#include <unistd.h>
#include "./queue/queue.h"

#define TEST_SUM 2000

void *producer(void *args)

    sharq *queue = (sharq *)args;
    // test data
    char strs[50] = "this is a test  ";
    int len = strlen(strs);
    for (int i = 0; i < TEST_SUM; i++)
    
        strs[len - 1] = '0' + i % 10;
        int res = queue_write(queue, (byte *)strs, len);
        if (res == ERROR)
        
            printf("[ERROR] : write error\\n");
        
    
    printf("write Over(%d)!\\n", TEST_SUM);


void *consumer(void *args)

    sharq *queue = (sharq *)args;
    byte *read_buff = (byte *)malloc(BLOCK);
    int count = 0;
    while(1)
    
        bzero(read_buff, BLOCK);
        int res = queue_read(queue, read_buff, BLOCK);
        if (res == ERROR)
        
            printf("[ERROR] : writer failed\\n");
            break;
        
        //printf("str : %s\\n", (char *)read_buff);
        count++;
        if (count == TEST_SUM)
        
            break;
        
    
    printf("read  Over(%d)!\\n", TEST_SUM);
    free(read_buff);


int main(void)

    sharq queue;
    queue_init(&queue, 10); 

    pthread_t pro_t, con_t;

    pthread_create(&pro_t, NULL, producer, &queue);
    pthread_create(&con_t, NULL, consumer, &queue);

    pthread_join(pro_t, NULL);
    pthread_join(con_t, NULL);

    return 0;

四、数据填充

         此处无需再看,只是为了满足CSDN发文助手的文章质量检测。

        以下内容由“废话生成器”生成。

        鲁巴金曾经说过,读书是在别人思想的帮助下,建立起自己的思想。这不禁令我深思。 在这种困难的抉择下,本人思来想去,寝食难安。 我们都知道,只要有意义,那么就必须慎重考虑。 我们都知道,只要有意义,那么就必须慎重考虑。 我认为, 塞涅卡在不经意间这样说过,真正的人生,只有在经过艰难卓绝的斗争之后才能实现。我希望诸位也能好好地体会这句话。 总结的来说, 罗曼·罗兰在不经意间这样说过,只有把抱怨环境的心情,化为上进的力量,才是成功的保证。这启发了我, 维龙在不经意间这样说过,要成功不需要什么特别的才能,只要把你能做的小事做得好就行了。这不禁令我深思。 既然如何, 在这种困难的抉择下,本人思来想去,寝食难安。 检测垃圾的发生,到底需要如何做到,不检测垃圾的发生,又会如何产生。 马云曾经说过,最大的挑战和突破在于用人,而用人最大的突破在于信任人。这句话语虽然很短,但令我浮想联翩。

以上是关于LinuxPOSIX信号量 | 基于环形队列的生产者消费者模型的主要内容,如果未能解决你的问题,请参考以下文章

Linux信号量

基于信号量与环形队列实现读写异步缓存队列

基于信号量与环形队列实现读写异步缓存队列

Linux多线程——生产者消费者模型

LINUX多线程(生产者消费者模型,POXIS信号量)

linux:线程 POSIX信号量&&线程池