Stream入门
Posted F3nGaoXS
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Stream入门相关的知识,希望对你有一定的参考价值。
Redis Stream
什么是Stream
Stream是Redis 5.0版本新增加的数据结构,主要用于消息队列(MQ,Message Queue)。
其实Redis本身有一个发布订阅(publish/subscribe)来实现消息队列的功能,但是有一个缺点就是消息无法持久化,如出现网络断开或者Redis宕机,那么消息则会被丢弃。总结就是发布订阅可以分发消息,但是无法记录历史消息。
而Stream则提供了消息的持久化和主备复制的功能,可以让任何客户端访问任何时刻的数据,并且还能记住每一个客户端的访问位置,保证消息不丢失。
Stream的结构
它有一个消息链表,将所有加入的消息都串起来,每个消息都有唯一的 ID和对应内容:
每个Stream(MQ)都有唯一的名称,对应Redis中的key,你甚至可以利用
keys *
列出它们。
- Consumer Group(消费组):消费message的组,一个消费组可以有0到n个消费者(Consumer)
- last_delivered_id(游标):每个消费组都会有个游标,任意一个消费者读取了消息都会使得游标向前移动。
- pending_ids(消费者未确认的消息的id):记录当前已经被客户端读取的消息,但是还没有被ack(Acknowledge character:确认字符)的消息的id
消息(message)相关命令
Xadd 添加消息到末尾
Xadd key ID field value [field value ...]
- key:队列名称,如果不存在则创建队列
- ID:消息id,推荐使用
*
,表示由redis自动生成,如果自定义请确保递增性。 - field value:记录,key-value键值对的方式存储
127.0.0.1:6379> Xadd test_queue * name xiaoming age 18
"1658053697953-0" # 返回由redis自动生成的消息ID
Xdel 删除消息
Xdel key ID [ID ...]
根据ID删除一个或者多个消息
127.0.0.1:6379> Xdel test_queue 1658053697953-0
(integer) 1
Xlen 获取stream包含的消息数量
Xlen key
返回队列的消息个数
127.0.0.1:6379> Xlen test_queue
(integer) 0
127.0.0.1:6379> Xadd test_queue * name xiaoming age 18
"1658053868394-0"
127.0.0.1:6379> Xlen test_queue
(integer) 1
Xrange 列出消息列表(不包括已经删除的消息)
Xrange key start end [COUNT count]
- start:开始值,
-
表示最小值 - end:结束值,
+
表示最大值 - count:数量,默认是列出所有
127.0.0.1:6379> Xrange test_queue - +
1) 1) "1658053868394-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "18"
127.0.0.1:6379> Xadd test_queue * name xiaoming2 age 20
"1658054319414-0"
127.0.0.1:6379> Xrange test_queue - +
1) 1) "1658053868394-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "18"
2) 1) "1658054319414-0"
2) 1) "name"
2) "xiaoming2"
3) "age"
4) "20"
Xrevrange 反向迭代消息列表
Xrevrange key end start [COUNT count]
Xread 以阻塞或非阻塞的方式读取消息
不使用消费组,直接进行消息的读取
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
- milliseconds(可选参数):阻塞毫秒数,默认是非阻塞模式
- id:可以指定确切的消息id,也可以使用
0-0
表示用首部开始读
Tips
STREAMS
表示可以同时读取多个队列中的消息
消费组(Xgroup)常用命令
Xgroup create 创建消费组
为队列创建一个从某位置开始读取的消费组
XGROUP [CREATE key groupname id-or-$] [MKSTREAM]
- key:队列名称
- groupname:消费组名
- id-or- : i d 表示从确切的地方开始消费, :id表示从确切的地方开始消费, :id表示从确切的地方开始消费,从尾部开始开始消费(即,只接受新消息)
- MKSTREAM:创建队列。如果在创建消费组的时候key不存在(队列不存在),则同时会创建队列
Tips
如果队列中的消费组(同一队列中,消费组名不能重复)已经存在,则应该会产生异常。
127.0.0.1:6379> XGROUP CREATE test_queue test_group 0-0
OK # 为test_queue队列创建名为test_group的消费组,从头开始读
127.0.0.1:6379> XGROUP CREATE test_queue test_group 0-0
(error) BUSYGROUP Consumer Group name already exists # 消费组重复,抛出异常
0-0
表示创建消费组时的游标指向此时的首部,切记切记切记
Xgroup delconsumer 删除组中消费者
删除某队列中某消费组的某个消费者
XGROUP [DELCONSUMER key groupname consumername]
- key:队列名称
- groupname:消费组名
- consumername:消费者名
Xgroup destroy 销毁消费组
删除某队列中的某消费组
XGROUP [DESTROY key groupname]
- key:队列名称
- groupname:消费组名
消费组读取(Xreadgroup)
指定消费组中的**消费者(consumer)**读取队列中的消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- group:消费组名
- consumer:消费者名
- count:读取消息的数量
- milliseconds:阻塞毫秒数
- key:队列名
- ID:消息ID
Tips
消费组read后的消息会立马进入pengding,待消息被ack后,消息会从pending中消失
使用名为c1的消费组代表消费组test_group消费消息,由于创建消费组时,游标指向首部,所以使用>
来表示消费首部的右边一个消息,即首个消息。此时消息进入pending
127.0.0.1:6379> XREADGROUP GROUP test_group c1 count 1 STREAMS test_queue >
1) 1) "test_queue"
2) 1) 1) "1658053868394-0"
2) 1) "name"
3) "age"
4) "18"
待确认消息组(Xpending)
xpending key group [[IDLE min-idle-time] start end count [consumer]]
- min-idle-time:限制时间过滤器
- start:从什么地方开始
- end:从什么地方结束
- count:显示数量
- consumer:可选,消费者名
可以查看到刚刚c1消费者消费的消息进入了pending
127.0.0.1:6379> xpending test_queue test_group - + 99
1) 1) "1658053868394-0"
2) "c1"
3) (integer) 1073303
4) (integer) 1
待消息被ack后才会从pending列表中移除
转移消息归属权(Xclaim)
可以将进入pending的消息转移给其他消费者(副作用:重置消息的空闲时间)如果两个消费者同时认领消息将永远不会成功,只有一个消费者能成功认领。
Xclaim key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid]
- key:队列名称
- group:消费组名称
- consumer:消费者名称
- min-idle-time:最小闲置时间(单位:毫秒),即只会尝试认领超过该闲置时间的消息。如:消息1的闲置时间是1000,消息2的闲置时间是500,min-idle-time设置为700,那么只有消息1会被重新认领,消息2则不会被重新认领。
- ID:消息的id
127.0.0.1:6379> xpending test_queue test_group - + 10
1) 1) "1658053868394-0"
2) "c2"
3) (integer) 16611
4) (integer) 2
127.0.0.1:6379> Xclaim test_queue test_group c3 86400000 1658053868394-0
(empty array) # 尝试让c3认领消息,但是消息的闲置时间没有超过8640000
127.0.0.1:6379> Xclaim test_queue test_group c3 10000 1658053868394-0
1) 1) "1658053868394-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "18"
127.0.0.1:6379> xpending test_queue test_group - + 99
1) 1) "1658053868394-0"
2) "c3"
3) (integer) 70866
4) (integer) 3
可以看到消息已经进入了c3的pending
确认消息(Xack)
确认某队列某消费组的消息
Xack key group ID [ID ...]
127.0.0.1:6379> xpending test_queue test_group - + 99
1) 1) "1658053868394-0"
2) "c3"
3) (integer) 70866
4) (integer) 3
127.0.0.1:6379> Xack test_queue test_group 1658053868394-0
(integer) 1
127.0.0.1:6379> xpending test_queue test_group - + 99
(empty array)
消息被ack之后,就会从pending中消失
详情(Xinfo)常用命令
Xinfo [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
stream详情
根据队列名称查看队列详情
Xinfo STREAM key
127.0.0.1:6379> Xinfo STREAM test_queue
1) "length"
2) (integer) 2
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1658054319414-0"
9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1658053868394-0"
2) 1) "name"
2) "xiaoming"
3) "age"
4) "18"
13) "last-entry"
14) 1) "1658054319414-0"
2) 1) "name"
2) "xiaoming2"
3) "age"
4) "20"
- “length”:消息的长度(个数)
- “groups”:队列中消费组的个数
- “first-entry”:首消息
- “last-entry”:尾消息
groups详情
查看队列中的所有消费组
Xinfo GROUPS key
127.0.0.1:6379> Xinfo GROUPS test_queue
1) 1) "name"
2) "test_group"
3) "consumers"
4) (integer) 3
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1658053868394-0"
CONSUMERS详情
查看队列中某消费组中的所有消费者
Xinfo CONSUMERS key groupname
127.0.0.1:6379> Xinfo CONSUMERS test_queue test_group
1) 1) "name"
2) "c1"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 164374831
2) 1) "name"
2) "c2"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 1759377
3) 1) "name"
2) "c3"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 1508038
Tips
idle
是闲置时间,单位是毫秒。
参考链接
本笔记大量内容参考自https://www.runoob.com/redis/redis-stream.html
以上是关于Stream入门的主要内容,如果未能解决你的问题,请参考以下文章