Redis学习笔记——消息队列的考验:Redis有哪些解决方案?
Posted qq_34132502
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis学习笔记——消息队列的考验:Redis有哪些解决方案?相关的知识,希望对你有一定的参考价值。
现在的互联网应用基本上都是采用分布式系统架构进行设计的,而很多分布式系统必备的一个基础软件就是消息队列。
消息队列要能支持组件通信消息的快速读写,而 Redis 本身支持数据的高速访问,正好可以满足消息队列的读写性能需求。不过,除了性能,消息队列还有其他的要求,所以,很多人都很关心一个问题:“Redis 适合做消息队列吗?”
消息队列的消息存取需求
我先介绍一下消息队列存取消息的过程。在分布式系统中,当两个组件要基于消息队列进行通信时,一个组件会把要处理的数据以消息的形式传递给消息队列,然后,这个组件就可以继续执行其他操作了;远端的另一个组件从消息队列中把消息读取出来,再在本地进行处理。
我们一般把消息队列中发送消息的组件称为生产者,把接收消息的组件称为消费者,下图展示了一个通用的消息队列的架构模型:
不过,消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证消息可靠性。
需求一:消息保序
虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息。
需求二:重复消息处理
消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。此时,消费者可能会收到多条重复的消息。对于重复的消息,消费者如果多次处理的话,就可能造成一个业务逻辑被多次执行,如果业务逻辑正好是要修改数据,那就会出现数据被多次修改的问题了。这当然也是无法接受的。
需求三:消息可靠性保证
另外,消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。
基于List的消息队列解决方案
消息保序
List 本身就是按先进先出的顺序对数据进行存取的,所以可以做到消息保序的需求。
在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,我已我们需要while(1)循环不停地RPOP
访问,造成CPU空转,严重影响性能。
为了解决这个问题,Redis 提供了BRPOP
命令。BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。
重复消息处理
这里其实有一个要求:消费者程序本身能对重复消息进行判断。
一方面,消息队列要能给每一个消息提供全局唯一的 ID 号;另一方面,消费者程序要把已经处理过的消息的 ID 号记录下来。
不过,List 本身是不会为每个消息生成 ID 号的,所以,消息的全局唯一 ID 号就需要生产者程序在发送消息前自行生成。
消息可靠性保证
当消费者程序从 List 中读取一条消息后,List 就不会再留存这条消息了。所以,如果消费者程序在处理消息的过程出现了故障或宕机,就会导致消息没有处理完成,那么,消费者程序再次启动后,就没法再次从 List 中读取消息了。
为了留存消息,List 类型提供了BRPOPLPUSH
命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。
但是,在用 List 做消息队列时,我们还可能遇到过一个问题:生产者消息发送很快,而消费者处理消息的速度比较慢,这就导致 List 中的消息越积越多,给 Redis 的内存带来很大压力。
基于Streams的消息队列解决方案
Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令。
- XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
- XREAD:用于读取消息,可以按 ID 读取数据;
- XREADGROUP:按消费组形式读取消息;
- XPENDING 和 XACK:XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,而 XACK 命令用于向消息队列确认消息处理已完成。
XADD
XADD 命令可以往消息队列中插入新消息,消息的格式是键 - 值对形式。对于插入的每一条消息,Streams 可以自动为其生成一个全局唯一的 ID。
消息的全局唯一 ID 由两部分组成,第一部分“是数据插入时,以毫秒为单位计算的当前服务器时间;第二部分表示插入消息在当前毫秒内的消息序号
XREAD
消费者也可以在调用 XRAED 时设定block
配置项,实现类似于 BRPOP 的阻塞读取操作
XGROUP
上面两个是List中也支持的,这是Streams特有的功能。
Streams 本身可以使用 XGROUP 创建消费组,创建消费组之后,Streams 可以使用 XREADGROUP 命令让消费组内的消费者读取消息
需要注意的是,消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了。
使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。
XPENDING 和 XACK
为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。
小结
如果分布式系统中的组件消息通信量不大,那么,Redis 只需要使用有限的内存空间就能满足消息存储的需求。相比 Redis 来说,Kafka 和 RabbitMQ 一般被认为是重量级的消息队列。
如果一个生产者发送给消息队列的消息,需要被多个消费者进行读取和处理(例如,一个消息是一条从业务系统采集的数据,既要被消费者 1 读取进行实时计算,也要被消费者 2 读取并留存到分布式文件系统 HDFS 中,以便后续进行历史查询),你会使用 Redis 的什么数据类型来解决这个问题呢?
这种情况下,只能使用Streams数据类型来解决。使用Streams数据类型,创建多个消费者组,就可以实现同时消费生产者的数据。每个消费者组内可以再挂多个消费者分担读取消息进行消费,消费完成后,各自向Redis发送XACK,标记自己的消费组已经消费到了哪个位置,而且消费组之间互不影响。
以上是关于Redis学习笔记——消息队列的考验:Redis有哪些解决方案?的主要内容,如果未能解决你的问题,请参考以下文章
Redis个人笔记:Redis应用场景,Redis常见命令,Reids缓存击穿穿透,Redis分布式锁实现方案,秒杀设计思路,Redis消息队列,Reids持久化,Redis主从哨兵分片集群