如何使用Redis 做队列操作
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Redis 做队列操作相关的知识,希望对你有一定的参考价值。
redis设计用来做缓存的,但是由于它自身的某种特性使得它可以用来做消息队列,它有几个阻塞式的API可以使用,正是这些阻塞式的API让其有能力做消息队列;
另外,做消息队列的其他特性例如FIFO(先入先出)也很容易实现,只需要一个list对象从头取数据,从尾部塞数据即可;
redis能做消息队列还得益于其list对象blpop brpop接口以及Pub/Sub(发布/订阅)的某些接口,它们都是阻塞版的,所以可以用来做消息队列。
<?php
$redis = getRedisInstance();//从Redis服务器拿到redis实例
$redis->connect('Redis服务器IP', 6379);
while (true)
$redis->lPush('list1', 'A_'.date('Y-m-d H:i:s'));
sleep(rand()%3);
?>
执行# php list_push.php &
出队列操作 list_pop.php文件
<?php
$redis = getRedisInstance();//从Redis服务器拿到redis实例
$redis->pconnect('Redis服务器IP', 6379);
while(true)
try
var_export( $redis->blPop('list1', 10) );
catch(Exception $e)
//echo $e;
实现方法(Python)
1.入队列(write.py)
#!/usr/bin/env python
import time
from redis import Redis
redis = Redis(host='127.0.0.1', port=6379)
while True:
now = time.strftime("%Y/%m/%d %H:%M:%S")
redis.lpush('test_queue', now)
time.sleep(1)
2.出队列(read.py)
#!/usr/bin/env python
import sys
from redis import Redis
redis = Redis(host='127.0.0.1', port=6379)
while True:
res = redis.rpop('test_queue')
if res == None:
pass
else:
print str(res)本回答被提问者采纳
场景应用:Redis如何做消息队列?
文章目录
Redis做消息队列
因为Redis中的list 是双向链表也可以当队列的特性
消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证消息可靠性。
Redis 的 List 和 Stream 两种数据类型,就可以满足消息队列的这三个需求。我们来了解下基于 List 的消息队列实现方法
消息保序
List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了。
List 可以使用 LPUSH + RPOP (或者反过来,RPUSH+LPOP)命令实现消息队列。
- 生产者使用
LPUSH key value[value...]
将消息插入到队列的头部,如果 key 不存在则会创建一个空的队列再插入消息。 - 消费者使用
RPOP key
依次读取队列的消息,先进先出。
不过,在消费者读取数据时,有一个潜在的性能风险点。
在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP
命令(比如使用一个while(1)循环)。如果有新消息写入,RPOP命令就会返回结果,否则,RPOP命令返回空值,再继续循环。
所以,即使没有新消息写入List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。
为了解决这个问题,Redis提供了 BRPOP 命令。BRPOP命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用RPOP命令相比,这种方式能节省CPU开销。
处理重复的消息
消费者要实现重复消息的判断,需要 2 个方面的要求:
- 每个消息都有一个全局的 ID。
- 消费者要记录已经处理过的消息的 ID。当收到一条消息后,消费者程序就可以对比收到的消息 ID 和记录的已处理过的消息 ID,来判断当前收到的消息有没有经过处理。如果已经处理过,那么,消费者程序就不再进行处理了。
但是 List 并不会为每个消息生成 ID 号,所以我们需要自行为每个消息生成一个全局唯一ID,生成之后,我们在用 LPUSH 命令把消息插入 List 时,需要在消息中包含这个全局唯一 ID。
例如,我们执行以下命令,就把一条全局 ID 为 111000111、库存量为 99 的消息插入了消息队列:
> LPUSH mq "111000111:stock:99"
(integer) 1
证消息可靠性
当消费者程序从 List 中读取一条消息后,List 就不会再留存这条消息了。所以,如果消费者程序在处理消息的过程出现了故障或宕机,就会导致消息没有处理完成,那么,消费者程序再次启动后,就没法再次从 List 中读取消息了。
为了留存消息,List 类型提供了 BRPOPLPUSH
命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。
这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。
总结
好了,到这里可以知道基于 List 类型的消息队列,满足消息队列的三大需求(消息保序、处理重复的消息和保证消息可靠性)。
- 消息保序:使用 LPUSH + RPOP;
- 阻塞读取:使用 BRPOP;
- 重复消息处理:生产者自行实现全局唯一 ID;
- 消息的可靠性:使用 BRPOPLPUSH
List 作为消息队列有什么缺陷?
List 不支持多个消费者消费同一条消息,因为一旦消费者拉取一条消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费。
要实现一条消息可以被多个消费者消费,那么就要将多个消费者组成一个消费组,使得多个消费者可以消费同一条消息,但是 List 类型并不支持消费组的实现。
这就要说起 Redis 从 5.0 版本开始提供的 Stream 数据类型了,Stream 同样能够满足消息队列的三大需求,而且它还支持「消费组」形式的消息读取。
以上是关于如何使用Redis 做队列操作的主要内容,如果未能解决你的问题,请参考以下文章
Redis五种数据结构及常用操作指令Redis在JAVA中如何封装使用
停止 Redis 后如何继续执行 celery 队列,然后再启动它?