Redis 实现 消息队列

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis 实现 消息队列相关的知识,希望对你有一定的参考价值。

参考技术A 虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。对于要求消息保序的场景来说,一旦出现这种消息被乱序处理的情况,就可能会导致业务逻辑被错误执行,从而给业务方造成损失。

消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。此时,消费者可能会收到多条重复的消息。对于重复的消息,消费者如果多次处理的话,就可能造成一个业务逻辑被多次执行,如果业务逻辑正好是要修改数据,那就会出现数据被多次修改的问题了。

消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。

List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了。

生产者提供一个消息id,消费者要把已经处理过的消息 ID 号记录下来。当收到一条消息后,消费者程序就可以对比收到的消息 ID 和记录的已处理过的消息 ID,来判断当前收到的消息有没有经过处理。如果已经处理过,那么,消费者程序就不再进行处理了。

为了留存消息,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。

在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP 命令(比如使用一个 while(1) 循环)。如果有新消息写入,RPOP 命令就会返回结果,否则,RPOP 命令返回空值,再继续循环。所以,即使没有新消息写入 List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。为了解决这个问题,Redis 提供了 BRPOP 命令。BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。

发送消息日志

消费消息日志

Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令。

XADD:插入消息,保证有序,可以自动生成全局唯一 ID;

XREAD:用于读取消息,可以按 ID 读取数据;

XREADGROUP:按消费组形式读取消息;

XPENDING :XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,

XACK:XACK 命令用于向消息队列确认消息处理已完成。

监听器

监听器容器

初始化消息队列(integral-queue)和消费者组(integral-group),执行RedisMqTest.initMessageQueue()即可

可参考List实现的处理方式

可使用RedisMqTest.getPendingQueue()获取未成功消费的消息队列进行处理

redis怎么做消息队列

参考技术A redis只适合做缓存,做消息队列你可以看看activeMQ

以上是关于Redis 实现 消息队列的主要内容,如果未能解决你的问题,请参考以下文章

Redis的基本使用(二) 消息队列

Redis 实现 消息队列

Redis(五)-特性-消息队列

redis怎么做消息队列

Redis 竟然能用 List 实现消息队列

使用redis实现消息队列