队列
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了队列相关的知识,希望对你有一定的参考价值。
1. 队列概念和结构
队列与栈类似是一种特殊的线性表, 其只允许在一端删除数据, 在另外一端插入数据
删除数据的一端叫做队头, 插入数据的一端叫做队尾
删除与插入操作在队列中, 叫做出队和入队, 如下图
其次, 队列中的所有数据都遵守先进先出原则
比如要将1移出队列, 必须首先将数据0先出队列, 才能将数据1出队
2. 队列的实现
下图是实现队列的结构, 其中QueNode链表结构用来存储数据和记录数据之间的次序
Queue结构作为整体用来实现队列, head指向队头 (链表结构的第一个节点), tail指向队尾(最后一个节点), size记录队列中的数据个数
下面具体实现
队列结构及接口
typedef int QueDataType;
typedef struct QueNode
QueDataType data;
struct QueNode* next;
QueNode;
typedef struct Queue
QueNode* head;
QueNode* tail;
int size;
Queue;
// 初始化队列
void QueInit(Queue* pq);
// 销毁队列
void QueDestroy(Queue* pq);
// 入队
void QuePush(Queue* pq, QueDataType x);
// 出队
void QuePop(Queue* pq);
// 计算队列中数据个数
int QueSize(Queue* pq);
// 判空
bool isQueEmpty(Queue* pq);
// 取队头数据
QueDataType QueFront(Queue* pq);
// 取队尾数据
QueDataType QueBack(Queue* pq);
初始化队列
// 初始化队列
void QueInit(Queue* pq)
assert(pq);
pq->head = pq->tail = NULL;
pq->size = 0;
入队
// 入队
void QuePush(Queue* pq, QueDataType x)
assert(pq);
QueNode* newnode = (QueNode*)malloc(sizeof(QueNode));
if (NULL == newnode)
perror("QuePush::malloc fail");
return;
newnode->data = x;
newnode->next = NULL;
if (NULL == pq->head)
pq->head = pq->tail = newnode;
else
pq->tail->next = newnode;
pq->tail = newnode;
pq->size++;
出队
// 出队
void QuePop(Queue* pq)
assert(pq);
assert(pq->head);
if (NULL == pq->head->next)
free(pq->head);
pq->head = pq->tail = NULL;
else
QueNode* next = pq->head->next;
free(pq->head);
pq->head = next;
pq->size--;
计算队中数据个数
// 计算队列中数据个数
int QueSize(Queue* pq)
return pq->size;
判断队列是否为空
// 判空
bool isQueEmpty(Queue* pq)
return pq->size == 0;
取队头/尾数据
// 取队头数据
QueDataType QueFront(Queue* pq)
assert(pq);
assert(!isQueEmpty(pq));
return pq->head->data;
// 取队尾数据
QueDataType QueBack(Queue* pq)
assert(pq);
assert(!isQueEmpty(pq));
return pq->tail->data;
销毁队列
// 销毁队列
void QueDestroy(Queue* pq)
assert(pq);
QueNode* cur = pq->head;
while (cur)
QueNode* next = cur->next;
free(cur);
cur = next;
pq->head = pq->tail = NULL;
pq->size = 0;
完整代码
queue.h
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <stdbool.h>
typedef int QueDataType;
typedef struct QueNode
QueDataType data;
struct QueNode* next;
QueNode;
typedef struct Queue
QueNode* head;
QueNode* tail;
int size;
Queue;
// 初始化队列
void QueInit(Queue* pq);
// 销毁队列
void QueDestroy(Queue* pq);
// 入队
void QuePush(Queue* pq, QueDataType x);
// 出队
void QuePop(Queue* pq);
// 计算队列中数据个数
int QueSize(Queue* pq);
// 判空
bool isQueEmpty(Queue* pq);
// 取队头数据
QueDataType QueFront(Queue* pq);
// 取队尾数据
QueDataType QueBack(Queue* pq);
queue.c
#include "queue.h"
// 初始化队列
void QueInit(Queue* pq)
assert(pq);
pq->head = pq->tail = NULL;
pq->size = 0;
// 销毁队列
void QueDestroy(Queue* pq)
assert(pq);
QueNode* cur = pq->head;
while (cur)
QueNode* next = cur->next;
free(cur);
cur = next;
pq->head = pq->tail = NULL;
pq->size = 0;
// 入队
void QuePush(Queue* pq, QueDataType x)
assert(pq);
QueNode* newnode = (QueNode*)malloc(sizeof(QueNode));
if (NULL == newnode)
perror("QuePush::malloc fail");
return;
newnode->data = x;
newnode->next = NULL;
if (NULL == pq->head)
pq->head = pq->tail = newnode;
else
pq->tail->next = newnode;
pq->tail = newnode;
pq->size++;
// 出队
void QuePop(Queue* pq)
assert(pq);
assert(pq->head);
if (NULL == pq->head->next)
free(pq->head);
pq->head = pq->tail = NULL;
else
QueNode* next = pq->head->next;
free(pq->head);
pq->head = next;
pq->size--;
// 计算队列中数据个数
int QueSize(Queue* pq)
return pq->size;
// 判空
bool isQueEmpty(Queue* pq)
return pq->size == 0;
// 取队头数据
QueDataType QueFront(Queue* pq)
assert(pq);
assert(!isQueEmpty(pq));
return pq->head->data;
// 取队尾数据
QueDataType QueBack(Queue* pq)
assert(pq);
assert(!isQueEmpty(pq));
return pq->tail->data;
test.c
#include "queue.h"
int main()
Queue que;
QueInit(&que);
QuePush(&que, 1);
QuePush(&que, 2);
QuePush(&que, 3);
QuePush(&que, 4);
QuePush(&que, 5);
while (!isQueEmpty(&que))
printf("%d ", QueFront(&que));
QuePop(&que);
QueDestroy(&que);
延时队列常用实现详解
参考技术A 队列是一种线性表,内部的元素是有序的,具有先进先出的特性。
延时队列,顾名思义,它是一个队列,但更重要的是具有延时的特性,与普通队列的先进先出不同,延时队列可以指定队列中的消息在某个时间点被消费。
DelayQueue是无界的延时阻塞队列,内部是使用优先级队列PriorityQueue实现的,其是按时间来定优先级的延时阻塞队列,只有在延迟期满时才能从队列中提取元素,先过期的元素会在队首,每次从队列里取出来都是最先要过期的元素, 当执行队列take操作元素未过期时会阻塞当前线程到元素过期为止 ;PriorityQueue是通过二叉小顶堆实现, 其任意一个非叶子节点的权值,都不大于其左右子节点的权值。
示例
队列中的元素必须实现Delayed接口
redis key的过期事件是通过redis 2.8.0之后版本提供的订阅发布功能(pub/sub)下发的,当key过期后系统自动Pub,应用程序只需订阅(sub)该事件即可。
实现步骤
示例
存在的问题
key的失效通知无法保证时效性。redis过期策略有一下三种:
默认情况下,Redis 使用的是 惰性删除 + 定期删除 的策略;每隔一段时间(可通过hz参数设置每秒执行的次数),Redis 会分别从各个库随机选取部分测试设置了过期时间的 Key,判断它们是否过期,过期则删除;如果 key 已过期,但没有被定期删除,由于惰性删除策略,在下次请求获取该数据时会将该数据删除。
可通过如下方式提高时效性
redis zset 结构是一个有序集合,每个元素都会关联一个 double 类型的分数,通过分数来为集合中的成员进行从小到大的排序;有序集合的成员是唯一的,但分数(score)却可以重复。
实现思路
将任务id作为member,到期时间作为score存入到zset中,然后不断轮询获取第一个元素,判断其是否过期,过期后删除并执行任务即可。
也可以通过lua脚本将 zrangebyscore 和 zrem 操作变成原子操作,避免了多线程时同一个me mber多次zrem。
存在的问题
RabbitMQ本身没有直接支持延迟队列功能,但是可以通过ttl及dlx(Dead Letter Exchanges)特性模拟出延迟队列的功能。
绑定在死信交换机上的队列。RabbitMQ的Queue(队列)可以配置两个参数x-dead-letter-exchange(死信交换机)和x-dead-letter-routing-key(指定routing-key发送,可选),当消息在一个队列中变成死信 (dead message) 之后,按照这两个参数可以将消息重新路由到另一个DLX Exchange(死信交换机),让消息重新被消费。
队列出现Dead Letter的情况有:
RabbitMQ可以对消息和队列设置TTL,为队列设置时,队列中所有消息都有相同的过期时间;对消息进行单独设置,每条消息过期时间可以不同;如果同时设置了队列的ttl和消息的ttl以两者之间TTL较小的那个数值为准。消息超过设置的ttl值未被消费,将会变为死信,消费者将无法再收到该消息。
ttl消息按照入发送顺序排列在队列中,且rabbitMQ只会判断队列头消息是否失效,失效后才会加入到死信队列中,如果发送多个过期时间不一致的消息,有可能后面的消息已经过期了,但队列头消息没有过期,导致其他消息不能及时加入到死信队列被消费。
针对上述的问题,可以使用 rabbitmq_delayed_message_exchang 插件来解决。
安装该插件后会生成新的Exchange类型 x-delayed-message ,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在 mnesia (一个分布式数据系统)表中,检测消息延迟时间(通过消息头的x-delay指定),如达到可投递时间时并将其通过 x-delayed-type 类型标记的交换机类型投递至目标队列。
插件的安装
使用示例
插件的局限
时间轮的应用广泛,包括linux内核的调度、zookeeper、netty、kafka、xxl-job、quartz等均有使用时间轮。
图中的圆盘可以看作是钟表的刻度。比如一圈round长度为24秒,刻度数为8,那么每一个刻度表示3秒。那么时间精度就是3秒。每个刻度为一个bucket(实际上就是TimerTaskList),TimerTaskList是环形双向链表,在其中链表项TimeTaskEntry封装了真正的定时任务TimerTask。TimerTaskList使用expiration字段记录了整个TimerTaskList的超时时间。TimeTaskEntry中的expirationMs字段记录了超时时间戳,timerTask字段指向了对应的TimerTask任务;根据每个TimerTaskEntry的过期时间和当前时间轮的时间,选择一个合适的bucket,把这个TimerTaskEntry对象放进去;对于延迟超过时间轮所能表示的范围有两种处理方式,一是通过增加一个字段-轮数,Netty 就是这样实现的;二是多层时间轮,Kakfa 是这样实现的。
下面介绍下kafka的多层时间轮,层数越高时间跨度越大。
每个使用到的TimerTaskList都会加入到DelayQueue中,DelayQueue会根据TimerTaskList对应的超时时间expiration来排序,最短expiration的TimerTaskList会被排在DelayQueue的队头,通过一个线程获取到DelayQueue中的超时的任务列表TimerTaskList之后,既可以根据TimerTaskList的expiration来推进时间轮的时间,也可以就获取到的TimerTaskList执行相应的操作,TimerTaskEntry该执行过期操作的就执行过期操作,该降级时间轮的就降级时间轮。
假设现在有一个任务在445ms后执行,默认情况下,各个层级的时间轮的时间格个数为20,第一层时间轮每一个时间格跨度为1ms,整个时间轮跨度为20ms,跨度不够。第二层时间轮每一个时间格跨度为20ms,整个时间轮跨度为400ms,跨度依然不够,第三层时间轮每一个时间格跨度为400ms,整个时间轮跨度为8000ms,现在跨度够了,此任务就放在第三层时间轮的第一个时间格对应的TimerTaskList,等待被执行,此TimerTaskList到期时间是400ms,随着时间的流逝,当此TimerTaskList到期时,距离该任务到期时间还有45ms,不能执行该任务,将重新提交到时间轮,此时第一层时间轮跨度依然不够,不能执行任务,第二层时间轮时间格跨度为20,整个世间轮跨度为400,跨度足够,放在第三个时间格等待执行,如此往复几次,高层时间轮最终会慢慢移动到低层时间轮上,最终任务到期执行。
以上是关于队列的主要内容,如果未能解决你的问题,请参考以下文章