队列

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了队列相关的知识,希望对你有一定的参考价值。

1. 队列概念和结构

队列与栈类似是一种特殊的线性表, 其只允许在一端删除数据, 在另外一端插入数据

删除数据的一端叫做队头, 插入数据的一端叫做队尾

删除与插入操作在队列中, 叫做出队和入队, 如下图

其次, 队列中的所有数据都遵守先进先出原则

比如要将1移出队列, 必须首先将数据0先出队列, 才能将数据1出队

 

2. 队列的实现

下图是实现队列的结构, 其中QueNode链表结构用来存储数据和记录数据之间的次序

Queue结构作为整体用来实现队列, head指向队头 (链表结构的第一个节点), tail指向队尾(最后一个节点), size记录队列中的数据个数

下面具体实现

 

队列结构及接口

typedef int QueDataType;
typedef struct QueNode

	QueDataType data;
	struct QueNode* next;
QueNode;

typedef struct Queue

	QueNode* head;
	QueNode* tail;
	int size;
Queue;

// 初始化队列
void QueInit(Queue* pq);
// 销毁队列
void QueDestroy(Queue* pq);
// 入队
void QuePush(Queue* pq, QueDataType x);
// 出队
void QuePop(Queue* pq);
// 计算队列中数据个数
int QueSize(Queue* pq);
// 判空
bool isQueEmpty(Queue* pq);
// 取队头数据
QueDataType QueFront(Queue* pq);
// 取队尾数据
QueDataType QueBack(Queue* pq);

初始化队列

// 初始化队列
void QueInit(Queue* pq)

	assert(pq);
	pq->head = pq->tail = NULL;
	pq->size = 0;

入队

// 入队
void QuePush(Queue* pq, QueDataType x)

	assert(pq);
	QueNode* newnode = (QueNode*)malloc(sizeof(QueNode));
	if (NULL == newnode)
	
		perror("QuePush::malloc fail");
		return;
	
	newnode->data = x;
	newnode->next = NULL;

	if (NULL == pq->head)
	
		pq->head = pq->tail = newnode;
	
	else
	
		pq->tail->next = newnode;
		pq->tail = newnode;
	
	pq->size++;

出队

// 出队
void QuePop(Queue* pq)

	assert(pq);
	assert(pq->head);
	if (NULL == pq->head->next)
	
		free(pq->head);
		pq->head = pq->tail = NULL;
	
	else
	
		QueNode* next = pq->head->next;
		free(pq->head);
		pq->head = next;
	
	pq->size--;

计算队中数据个数

// 计算队列中数据个数
int QueSize(Queue* pq)

	return pq->size;

 

判断队列是否为空

// 判空
bool isQueEmpty(Queue* pq)

	return pq->size == 0;

 

取队头/尾数据

// 取队头数据
QueDataType QueFront(Queue* pq)

	assert(pq);
	assert(!isQueEmpty(pq));
	return pq->head->data;

// 取队尾数据
QueDataType QueBack(Queue* pq)

	assert(pq);
	assert(!isQueEmpty(pq));
	return pq->tail->data;

 

销毁队列 

// 销毁队列
void QueDestroy(Queue* pq)

	assert(pq);
	QueNode* cur = pq->head;
	while (cur)
	
		QueNode* next = cur->next;
		free(cur);
		cur = next;
	
	pq->head = pq->tail = NULL;
	pq->size = 0;

完整代码

queue.h
 #include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <stdbool.h>

typedef int QueDataType;

typedef struct QueNode

	QueDataType data;
	struct QueNode* next;
QueNode;

typedef struct Queue

	QueNode* head;
	QueNode* tail;
	int size;
Queue;

// 初始化队列
void QueInit(Queue* pq);
// 销毁队列
void QueDestroy(Queue* pq);
// 入队
void QuePush(Queue* pq, QueDataType x);
// 出队
void QuePop(Queue* pq);
// 计算队列中数据个数
int QueSize(Queue* pq);
// 判空
bool isQueEmpty(Queue* pq);
// 取队头数据
QueDataType QueFront(Queue* pq);
// 取队尾数据
QueDataType QueBack(Queue* pq);
queue.c
 #include "queue.h"

// 初始化队列
void QueInit(Queue* pq)

	assert(pq);
	pq->head = pq->tail = NULL;
	pq->size = 0;

// 销毁队列
void QueDestroy(Queue* pq)

	assert(pq);
	QueNode* cur = pq->head;
	while (cur)
	
		QueNode* next = cur->next;
		free(cur);
		cur = next;
	
	pq->head = pq->tail = NULL;
	pq->size = 0;

// 入队
void QuePush(Queue* pq, QueDataType x)

	assert(pq);
	QueNode* newnode = (QueNode*)malloc(sizeof(QueNode));
	if (NULL == newnode)
	
		perror("QuePush::malloc fail");
		return;
	
	newnode->data = x;
	newnode->next = NULL;

	if (NULL == pq->head)
	
		pq->head = pq->tail = newnode;
	
	else
	
		pq->tail->next = newnode;
		pq->tail = newnode;
	
	pq->size++;

// 出队
void QuePop(Queue* pq)

	assert(pq);
	assert(pq->head);
	if (NULL == pq->head->next)
	
		free(pq->head);
		pq->head = pq->tail = NULL;
	
	else
	
		QueNode* next = pq->head->next;
		free(pq->head);
		pq->head = next;
	
	pq->size--;

// 计算队列中数据个数
int QueSize(Queue* pq)

	return pq->size;

// 判空
bool isQueEmpty(Queue* pq)

	return pq->size == 0;

// 取队头数据
QueDataType QueFront(Queue* pq)

	assert(pq);
	assert(!isQueEmpty(pq));
	return pq->head->data;

// 取队尾数据
QueDataType QueBack(Queue* pq)

	assert(pq);
	assert(!isQueEmpty(pq));
	return pq->tail->data;
test.c
 #include "queue.h"
int main()

	Queue que;
	QueInit(&que);
	QuePush(&que, 1);
	QuePush(&que, 2);
	QuePush(&que, 3);
	QuePush(&que, 4);
	QuePush(&que, 5);
	while (!isQueEmpty(&que))
	
		printf("%d ", QueFront(&que));
		QuePop(&que);
	
	QueDestroy(&que);

 

延时队列常用实现详解

参考技术A

队列是一种线性表,内部的元素是有序的,具有先进先出的特性。
延时队列,顾名思义,它是一个队列,但更重要的是具有延时的特性,与普通队列的先进先出不同,延时队列可以指定队列中的消息在某个时间点被消费。

DelayQueue是无界的延时阻塞队列,内部是使用优先级队列PriorityQueue实现的,其是按时间来定优先级的延时阻塞队列,只有在延迟期满时才能从队列中提取元素,先过期的元素会在队首,每次从队列里取出来都是最先要过期的元素, 当执行队列take操作元素未过期时会阻塞当前线程到元素过期为止 ;PriorityQueue是通过二叉小顶堆实现, 其任意一个非叶子节点的权值,都不大于其左右子节点的权值。

示例
队列中的元素必须实现Delayed接口

redis key的过期事件是通过redis 2.8.0之后版本提供的订阅发布功能(pub/sub)下发的,当key过期后系统自动Pub,应用程序只需订阅(sub)该事件即可。

实现步骤

示例

存在的问题

key的失效通知无法保证时效性。redis过期策略有一下三种:

默认情况下,Redis 使用的是 惰性删除 + 定期删除 的策略;每隔一段时间(可通过hz参数设置每秒执行的次数),Redis 会分别从各个库随机选取部分测试设置了过期时间的 Key,判断它们是否过期,过期则删除;如果 key 已过期,但没有被定期删除,由于惰性删除策略,在下次请求获取该数据时会将该数据删除。

可通过如下方式提高时效性

redis zset 结构是一个有序集合,每个元素都会关联一个 double 类型的分数,通过分数来为集合中的成员进行从小到大的排序;有序集合的成员是唯一的,但分数(score)却可以重复。

实现思路

将任务id作为member,到期时间作为score存入到zset中,然后不断轮询获取第一个元素,判断其是否过期,过期后删除并执行任务即可。

也可以通过lua脚本将 zrangebyscore 和 zrem 操作变成原子操作,避免了多线程时同一个me mber多次zrem。

存在的问题

RabbitMQ本身没有直接支持延迟队列功能,但是可以通过ttl及dlx(Dead Letter Exchanges)特性模拟出延迟队列的功能。

绑定在死信交换机上的队列。RabbitMQ的Queue(队列)可以配置两个参数x-dead-letter-exchange(死信交换机)和x-dead-letter-routing-key(指定routing-key发送,可选),当消息在一个队列中变成死信 (dead message) 之后,按照这两个参数可以将消息重新路由到另一个DLX Exchange(死信交换机),让消息重新被消费。

队列出现Dead Letter的情况有:

RabbitMQ可以对消息和队列设置TTL,为队列设置时,队列中所有消息都有相同的过期时间;对消息进行单独设置,每条消息过期时间可以不同;如果同时设置了队列的ttl和消息的ttl以两者之间TTL较小的那个数值为准。消息超过设置的ttl值未被消费,将会变为死信,消费者将无法再收到该消息。

ttl消息按照入发送顺序排列在队列中,且rabbitMQ只会判断队列头消息是否失效,失效后才会加入到死信队列中,如果发送多个过期时间不一致的消息,有可能后面的消息已经过期了,但队列头消息没有过期,导致其他消息不能及时加入到死信队列被消费。

针对上述的问题,可以使用 rabbitmq_delayed_message_exchang 插件来解决。

安装该插件后会生成新的Exchange类型 x-delayed-message ,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在 mnesia (一个分布式数据系统)表中,检测消息延迟时间(通过消息头的x-delay指定),如达到可投递时间时并将其通过 x-delayed-type 类型标记的交换机类型投递至目标队列。

插件的安装

使用示例

插件的局限

时间轮的应用广泛,包括linux内核的调度、zookeeper、netty、kafka、xxl-job、quartz等均有使用时间轮。

图中的圆盘可以看作是钟表的刻度。比如一圈round长度为24秒,刻度数为8,那么每一个刻度表示3秒。那么时间精度就是3秒。每个刻度为一个bucket(实际上就是TimerTaskList),TimerTaskList是环形双向链表,在其中链表项TimeTaskEntry封装了真正的定时任务TimerTask。TimerTaskList使用expiration字段记录了整个TimerTaskList的超时时间。TimeTaskEntry中的expirationMs字段记录了超时时间戳,timerTask字段指向了对应的TimerTask任务;根据每个TimerTaskEntry的过期时间和当前时间轮的时间,选择一个合适的bucket,把这个TimerTaskEntry对象放进去;对于延迟超过时间轮所能表示的范围有两种处理方式,一是通过增加一个字段-轮数,Netty 就是这样实现的;二是多层时间轮,Kakfa 是这样实现的。

下面介绍下kafka的多层时间轮,层数越高时间跨度越大。

每个使用到的TimerTaskList都会加入到DelayQueue中,DelayQueue会根据TimerTaskList对应的超时时间expiration来排序,最短expiration的TimerTaskList会被排在DelayQueue的队头,通过一个线程获取到DelayQueue中的超时的任务列表TimerTaskList之后,既可以根据TimerTaskList的expiration来推进时间轮的时间,也可以就获取到的TimerTaskList执行相应的操作,TimerTaskEntry该执行过期操作的就执行过期操作,该降级时间轮的就降级时间轮。

假设现在有一个任务在445ms后执行,默认情况下,各个层级的时间轮的时间格个数为20,第一层时间轮每一个时间格跨度为1ms,整个时间轮跨度为20ms,跨度不够。第二层时间轮每一个时间格跨度为20ms,整个时间轮跨度为400ms,跨度依然不够,第三层时间轮每一个时间格跨度为400ms,整个时间轮跨度为8000ms,现在跨度够了,此任务就放在第三层时间轮的第一个时间格对应的TimerTaskList,等待被执行,此TimerTaskList到期时间是400ms,随着时间的流逝,当此TimerTaskList到期时,距离该任务到期时间还有45ms,不能执行该任务,将重新提交到时间轮,此时第一层时间轮跨度依然不够,不能执行任务,第二层时间轮时间格跨度为20,整个世间轮跨度为400,跨度足够,放在第三个时间格等待执行,如此往复几次,高层时间轮最终会慢慢移动到低层时间轮上,最终任务到期执行。

以上是关于队列的主要内容,如果未能解决你的问题,请参考以下文章

同步,异步,串行队列,并发队列,全局队列,主队列等概念的总结

消息队列 - 死信、延迟、重试队列

rabbitmq死信队列及延迟队列

阻塞队列和线程池原理

延时队列常用实现详解

阻塞队列