基于信号量与环形队列实现读写异步缓存队列
Posted 我要出家当道士
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于信号量与环形队列实现读写异步缓存队列相关的知识,希望对你有一定的参考价值。
目录
一、需求
实现一个读写异步的缓存队列,主要实现功能如下:
1、缓存队列作为临界资源,要是线程安全的,不会出现读线程与写线程同时操作缓存队列的情况发生。
2、当缓存队列被塞满以后,写线程阻塞等待读线程读取数据。
3、当缓存队列空时,读线程需要阻塞等待写线程写入数据。
4、可指定缓存队列的长度,缓存队列中,存放 byte 类型的数据。
二、技术思路
基于信号量与环形队列实现。
参考《现代操作系统》,使用了三个信号量:一个称为full,用来记录充满的缓冲槽数目;一个称为empty,记录空的缓冲槽总数;一个称为mutex,用来确保生产者和消费者不会同时访问缓冲区。full的初值为0,empty的初值为缓冲区中槽的数目,mutex初值为1。供两个或多个进程使用的信号量,其初值为1,保证同时只有一个进程可以进入临界区 。
环形队列是为了能够重复利用队列中的空间。当读数据或写数据抵达队列边界时,能够跳到开头衔接上。由于已经使用信号量做队列长度的约束,所以在使用环形队列时可以省不少事,只需要实现首尾跳转即可,无需关心队头与队尾位置的问题。
三、示例代码
仅作参考,未贴头文件。
queue.c
#include "queue.h"
typedef struct shared_queue
sem_t queue_full;
sem_t queue_empty;
sem_t queue_lock;
item *queue;
int queue_len;
// ring queue
int tail;
int font;
sharq;
int queue_init(sharq *que, int len)
if (que == NULL)
printf("[error] : the pointer of queue is NULL\\n");
return ERROR;
que->queue = (item *)malloc(sizeof(item) * len);
if (que->queue == NULL)
printf("[error] : queue malloc failed\\n");
return ERROR;
que->queue_len = len;
que->tail = 0;
que->font = 0;
sem_init(&que->queue_full, 0, 0);
sem_init(&que->queue_empty, 0, len);
sem_init(&que->queue_lock, 0, 1);
return OK;
int queue_write(sharq *que, byte *input, int len_in)
sem_wait(&que->queue_empty);
sem_wait(&que->queue_lock);
// insert
item *it = que->queue + que->tail;
it->buff = (byte *)malloc(len_in);
if (it->buff == NULL)
printf("[error] : item malloc failed\\n");
return ERROR;
memcpy(it->buff, input, len_in);
it->buff_len = len_in;
// ring queue
que->tail++;
que->tail %= que->queue_len;
sem_post(&que->queue_lock);
sem_post(&que->queue_full);
return OK;
int queue_read(sharq *que, byte *output, int len_out)
sem_wait(&que->queue_full);
sem_wait(&que->queue_lock);
// delete
item *it = que->queue + que->font;
if (len_out < it->buff_len)
printf("[error] : length of output_buff is lower than item_buff\\n");
return ERROR;
memcpy(output, it->buff, it->buff_len);
// ring queue
que->font++;
que->font %= que->queue_len;
sem_post(&que->queue_lock);
sem_post(&que->queue_empty);
return OK;
int queue_destroy(sharq *que)
if (que == NULL)
printf("[error] : can not destroy NULL queue\\n");
return ERROR;
sem_destroy(&que->queue_full);
sem_destroy(&que->queue_empty);
sem_destroy(&que->queue_lock);
if (que->queue != NULL)
free(que->queue);
return OK;
main.c
#include <stdio.h>
#include <pthread.h>
#include <string.h>
#include <strings.h>
#include <unistd.h>
#include "./queue/queue.h"
#define TEST_SUM 2000
void *producer(void *args)
sharq *queue = (sharq *)args;
// test data
char strs[50] = "this is a test ";
int len = strlen(strs);
for (int i = 0; i < TEST_SUM; i++)
strs[len - 1] = '0' + i % 10;
int res = queue_write(queue, (byte *)strs, len);
if (res == ERROR)
printf("[ERROR] : write error\\n");
printf("write Over(%d)!\\n", TEST_SUM);
void *consumer(void *args)
sharq *queue = (sharq *)args;
byte *read_buff = (byte *)malloc(BLOCK);
int count = 0;
while(1)
bzero(read_buff, BLOCK);
int res = queue_read(queue, read_buff, BLOCK);
if (res == ERROR)
printf("[ERROR] : writer failed\\n");
break;
//printf("str : %s\\n", (char *)read_buff);
count++;
if (count == TEST_SUM)
break;
printf("read Over(%d)!\\n", TEST_SUM);
free(read_buff);
int main(void)
sharq queue;
queue_init(&queue, 10);
pthread_t pro_t, con_t;
pthread_create(&pro_t, NULL, producer, &queue);
pthread_create(&con_t, NULL, consumer, &queue);
pthread_join(pro_t, NULL);
pthread_join(con_t, NULL);
return 0;
四、数据填充
此处无需再看,只是为了满足CSDN发文助手的文章质量检测。
以下内容由“废话生成器”生成。
鲁巴金曾经说过,读书是在别人思想的帮助下,建立起自己的思想。这不禁令我深思。 在这种困难的抉择下,本人思来想去,寝食难安。 我们都知道,只要有意义,那么就必须慎重考虑。 我们都知道,只要有意义,那么就必须慎重考虑。 我认为, 塞涅卡在不经意间这样说过,真正的人生,只有在经过艰难卓绝的斗争之后才能实现。我希望诸位也能好好地体会这句话。 总结的来说, 罗曼·罗兰在不经意间这样说过,只有把抱怨环境的心情,化为上进的力量,才是成功的保证。这启发了我, 维龙在不经意间这样说过,要成功不需要什么特别的才能,只要把你能做的小事做得好就行了。这不禁令我深思。 既然如何, 在这种困难的抉择下,本人思来想去,寝食难安。 检测垃圾的发生,到底需要如何做到,不检测垃圾的发生,又会如何产生。 马云曾经说过,最大的挑战和突破在于用人,而用人最大的突破在于信任人。这句话语虽然很短,但令我浮想联翩。
以上是关于基于信号量与环形队列实现读写异步缓存队列的主要内容,如果未能解决你的问题,请参考以下文章
Linux多线程_(Posix信号量实现环形队列生产者消费者模型)