如何使用Redis 做队列操作

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Redis 做队列操作相关的知识,希望对你有一定的参考价值。

    redis设计用来做缓存的,但是由于它自身的某种特性使得它可以用来做消息队列,它有几个阻塞式的API可以使用,正是这些阻塞式的API让其有能力做消息队列;

    另外,做消息队列的其他特性例如FIFO(先入先出)也很容易实现,只需要一个list对象从头取数据,从尾部塞数据即可;

    redis能做消息队列还得益于其list对象blpop brpop接口以及Pub/Sub(发布/订阅)的某些接口,它们都是阻塞版的,所以可以用来做消息队列。

参考技术A 入队列操作文件 list_push.php

<?php
$redis = getRedisInstance();//从Redis服务器拿到redis实例
$redis->connect('Redis服务器IP', 6379);
while (true)
$redis->lPush('list1', 'A_'.date('Y-m-d H:i:s'));
sleep(rand()%3);

?>

执行# php list_push.php &

出队列操作 list_pop.php文件
<?php
$redis = getRedisInstance();//从Redis服务器拿到redis实例
$redis->pconnect('Redis服务器IP', 6379);
while(true)
try
var_export( $redis->blPop('list1', 10) );
catch(Exception $e)
//echo $e;




实现方法(Python)

1.入队列(write.py)

#!/usr/bin/env python

import time

from redis import Redis

redis = Redis(host='127.0.0.1', port=6379)

while True:

now = time.strftime("%Y/%m/%d %H:%M:%S")

redis.lpush('test_queue', now)

time.sleep(1)

2.出队列(read.py)

#!/usr/bin/env python

import sys

from redis import Redis

redis = Redis(host='127.0.0.1', port=6379)

while True:

res = redis.rpop('test_queue')

if res == None:

pass

else:

print str(res)本回答被提问者采纳

场景应用:Redis如何做消息队列?

文章目录

Redis做消息队列

因为Redis中的list 是双向链表也可以当队列的特性

消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证消息可靠性

Redis 的 List 和 Stream 两种数据类型,就可以满足消息队列的这三个需求。我们来了解下基于 List 的消息队列实现方法

消息保序

List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了。

List 可以使用 LPUSH + RPOP (或者反过来,RPUSH+LPOP)命令实现消息队列。

  • 生产者使用 LPUSH key value[value...] 将消息插入到队列的头部,如果 key 不存在则会创建一个空的队列再插入消息。
  • 消费者使用 RPOP key 依次读取队列的消息,先进先出。

不过,在消费者读取数据时,有一个潜在的性能风险点。

在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP 命令(比如使用一个while(1)循环)。如果有新消息写入,RPOP命令就会返回结果,否则,RPOP命令返回空值,再继续循环。

所以,即使没有新消息写入List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。

为了解决这个问题,Redis提供了 BRPOP 命令。BRPOP命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用RPOP命令相比,这种方式能节省CPU开销。

处理重复的消息

消费者要实现重复消息的判断,需要 2 个方面的要求:

  • 每个消息都有一个全局的 ID。
  • 消费者要记录已经处理过的消息的 ID。当收到一条消息后,消费者程序就可以对比收到的消息 ID 和记录的已处理过的消息 ID,来判断当前收到的消息有没有经过处理。如果已经处理过,那么,消费者程序就不再进行处理了。

但是 List 并不会为每个消息生成 ID 号,所以我们需要自行为每个消息生成一个全局唯一ID,生成之后,我们在用 LPUSH 命令把消息插入 List 时,需要在消息中包含这个全局唯一 ID。

例如,我们执行以下命令,就把一条全局 ID 为 111000111、库存量为 99 的消息插入了消息队列:

> LPUSH mq "111000111:stock:99"
(integer) 1

证消息可靠性

当消费者程序从 List 中读取一条消息后,List 就不会再留存这条消息了。所以,如果消费者程序在处理消息的过程出现了故障或宕机,就会导致消息没有处理完成,那么,消费者程序再次启动后,就没法再次从 List 中读取消息了。

为了留存消息,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存

这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。

总结

好了,到这里可以知道基于 List 类型的消息队列,满足消息队列的三大需求(消息保序、处理重复的消息和保证消息可靠性)。

  • 消息保序:使用 LPUSH + RPOP;
  • 阻塞读取:使用 BRPOP;
  • 重复消息处理:生产者自行实现全局唯一 ID;
  • 消息的可靠性:使用 BRPOPLPUSH

List 作为消息队列有什么缺陷?

List 不支持多个消费者消费同一条消息,因为一旦消费者拉取一条消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费。

要实现一条消息可以被多个消费者消费,那么就要将多个消费者组成一个消费组,使得多个消费者可以消费同一条消息,但是 List 类型并不支持消费组的实现

这就要说起 Redis 从 5.0 版本开始提供的 Stream 数据类型了,Stream 同样能够满足消息队列的三大需求,而且它还支持「消费组」形式的消息读取。

以上是关于如何使用Redis 做队列操作的主要内容,如果未能解决你的问题,请参考以下文章

场景应用:Redis如何做消息队列?

Redis五种数据结构及常用操作指令Redis在JAVA中如何封装使用

停止 Redis 后如何继续执行 celery 队列,然后再启动它?

如何在 redis 上的 laravel 队列中获取所有待处理的作业?

如何保证最少消费一次redis的list队列数据

消息队列为啥用redis实现