Redis 内置了一个Kafka —— Stream
Posted 360搜索技术团队
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis 内置了一个Kafka —— Stream相关的知识,希望对你有一定的参考价值。
流(Stream)是Redis从5.0.0版本新加入的一个数据结构,是一个类似于Kafka的消息系统。该结构相关的大部分命令使用字母 X
开头 如 XADD
, XLEN
, XRANGE
等。
在开始详细叙述之前,先说明一下:
本文内容主要是结合官网文章Introduction to Redis Streams 和个人理解整理而成。另外想吐槽下自己,游戏玩多了也不太好, Stream
总是不自觉的写成了 Steam
。
向Stream中添加数据
命令格式如
XADD key ID field string [field string ...]
如
> xadd foo * f1 1 f2 100
"1541558444516-0"
返回的 1541558444516-0
是消息id。 可以看到上面的命令ID参数传的是 *
, 代表由系统生成的ID,当然,你可以显式的指定消息ID(基本不太常用),对于这个ID有几点需要注意:
ID必须是由
-
连接的两部分,前一部分默认情况下是当前Server当前的毫秒时间戳,后一部分是一个无符号64位长整型序列号;同一个KEY下,后加入的ID一定要比已加入的ID大。
获取Stream长度
使用 XLEN
, 如
> xlen foo
(integer) 1
从Stream中获取数据
有三种方式从Stream中获取数据
按照范围查询
包括 XRANGE
和 XREVRANGE
两个命令 ,分别是正序和反序, 以正序 XRANGE
为例:
XRANGE key start end [COUNT count]
从Stream这个key中返回ID范围是 start
到 end
的 [前count个]
数据。 如
> xrange foo - +
1) 1) "1541558444516-0"
2) 1) "f1"
2) "1"
3) "f2"
4) "100"
2) 1) "1541559735247-0"
2) 1) "f1"
2) "2"
3) "f2"
4) "101"
3) 1) "1541559739871-0"
2) 1) "f1"
2) "3"
3) "f2"
4) "102"
-
, +
分别表示最小和最大ID。
监听(XREAD)
XREAD
命令
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
如
> xread COUNT 2 STREAMS foo 0
1) 1) "foo"
2) 1) 1) "1541558444516-0"
2) 1) "f1"
2) "1"
3) "f2"
4) "100"
2) 1) "1541559735247-0"
2) 1) "f1"
2) "2"
3) "f2"
4) "101"
执行完成后再执行:
> xlen foo
(integer) 3
发现Stream的长度没有变化,也就是说, XREAD
不会删除Stream里的数据。
上面的这个例子是一个非阻塞的方式监听。当使用 BLOCK
参数,并传递一个超时时间(0为永不超时),将启动一个阻塞方式的监听。 特殊的ID $
表示从最新的ID开始监听。如:
启动一个监听客户端:
> xread block 0 STREAMS foo $
该命令阻塞等待, 此时另起一个客户端:
> xadd foo * f1 5 f2 104
"1541560822306-0"
等待的客户端收到消息:
> xread COUNT 2 block 60000 STREAMS foo $
1) 1) "foo"
2) 1) 1) "1541560822306-0"
2) 1) "f1"
2) "5"
3) "f2"
4) "104"
(18.24s)
如果有多个客户端都在监听同一个流,这些客户端都可以得到流中的数据。
消费者组 (Consumer Group)
机制说明
涉及三个命令 分别是
XGROUP
: 创建或者销毁一个 Consumer Group, 也可以从Consumer Group中删除一个 ConsumerXREADGROUP
: 指定 Consumer Group 中的一个Consumer,消费一条消息XACK
: 在XREADGROUP
调用时不指定NOACK
时需要显式调用XACK
命令 来确认该消息已被正确处理,可以删除。
消费者组的消费方式可以用下图表示
|---> consumer1
|---> group1 --|
| |---> consumer2
stream ---|
| |---> consumer3
|---> group2 --|
|---> consumer4
一个消息 msg 可通过 group1 和 group2 分发,并且 group1 中的 msg 会被 consumer1 或者 consumer2 消费,group2 中的 msg 会被 consumer3 或者 comsumer4 消费。
创建/消费/确认
使用 XGROUP
命令创建一个consumer group,如
> xgroup create foo foocg1 $
OK
这样就创建了一个名为foocg1的 consumer group, 其中 $
表示该组将要消费当前时间开始的消息,然后我们向Stream中添加一些消息:
> xadd foo * abc 1
"1541573729250-0"
> xadd foo * abc 2
"1541573730690-0"
> xadd foo * abc 3
"1541573732130-0"
> xadd foo * abc 4
"1541573733693-0"
> xadd foo * abc 5
"1541573735201-0"
> xadd foo * abc 6
"1541573736675-0"
> xadd foo * abc 7
"1541573738869-0"
此时,使用foocg1下的c1消费者来消费一条消息
> xreadgroup group foocg1 c1 count 1 streams foo >
1) 1) "foo"
2) 1) 1) "1541573729250-0"
2) 1) "abc"
2) "1"
其中最后的ID字段 指定为 >
, 表示只获取那些从来没有被分发的消息。
我们继续消费一条消息
> xreadgroup group foocg1 c1 count 1 streams foo >
1) 1) "foo"
2) 1) 1) "1541573730690-0"
2) 1) "abc"
2) "2"
然后,再消费历史上所有的数据
> xreadgroup group foocg1 c1 count 1 streams foo 0-0
1) 1) "foo"
2) 1) 1) "1541573729250-0"
2) 1) "abc"
2) "1"
注意这里ID传的是 0-0
, 此时会发现消费的是第一条消息。也就是说,没有经过XACK的消息依旧会保留在队列中。
执行 XACK
操作:
> xack foo foocg1 1541573729250-0
(integer) 1
此时再去消费历史数据
> xreadgroup group foocg1 c1 count 1 streams foo 0-0
1) 1) "foo"
2) 1) 1) "1541573730690-0"
2) 1) "abc"
2) "2"
发现已经获取不到被 XACK
的消息了,当所有的历史数据全部被 XACK
后:
> xreadgroup group foocg1 c1 streams foo 0-0
1) 1) "foo"
2) (empty list or set)
一个伪码表示的客户端
一个消费者组的实现的伪码表示可以写作:
client = 初始化客户端
// 数据是否有效
isHistory = true
// 上一个数据的id
lastid = "0-0"
while true:
if isHistory:
id = lastid
else :
id = ">"
msgs = client.xreadgroup(从group中以阻塞的方式获取n个lastid以后的消息)
if msgs == null :
// 此时在过期时间内没有获取到数据
continue
if msgs[0][1].length == 0 :
// 此时旧的消息已经被消费完
isHistory = false
foreach msgs as msg:
id = msg.id
// 处理msg
dealwith(msg)
// 发ACK
client.xack(group_id, id)
lastid = id
XPENDING 和 XCLAIM
XPENDING
可以获取消息系统中已经分发但是未被 XACK
的消息的情况
XPENDING key group [start end count] [consumer]
如:
> xpending foo foocg1
1) (integer) 8
2) "1541573732130-0"
3) "1541581755413-0"
4) 1) 1) "c1"
2) "8"
表示有8个未确认消息,最小ID是"1541573732130-0",最大ID是"1541581755413-0", 其中c1 消费者有8个未确认的消息。传递start, end, count参数可以获取指定范围指定数目的未确认消息的详细信息,传递consumer可获取指定consumer下未确认信息列表。
XCLAIM
可以将未被确认的消息重新声明给其他消费者
XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]
如下面命令可以获取到一条原属于c1的消息未被确认:
> xpending foo foocg1 - + 10
...
7) 1) "1541581753377-0"
2) "c1"
3) (integer) 30175
4) (integer) 1
...
下面命令可以将将原本属于 消费者c1 的消息 1541581753377-0 在等待确认的时间>30000情况下重新声明给c2
> xclaim foo foocg1 c2 30000 1541581753377-0
1) 1) "1541581753377-0"
2) 1) "abc"
2) "9"
此时
> xpending foo foocg1 - + 10
...
7) 1) "1541581753377-0"
2) "c2"
3) (integer) 8552
4) (integer) 1
...
XPENDING
和 XCLAIM
可以用来处理当一个消费者获取到一个消息后,运行失败导致无法执行 XACK
,此时这个消息就永远不会进行确认已消费操作的情形。
其他命令
XINFO
可以查看流的一些信息XTRIM
可以获得一个有长度上限的StreamXDEL
可以从Stream中删除消息
这些命令可以在官网找到详细的说明,这里就不再赘述了。
其他说明
Stream支持AOF和RDB格式的持久化
当调用XDEL等造成Stream长度为0时,为了保留可能存在的Consumer Group信息,Stream不会被删除。
Redis Cluster场景下,由于Key存在于单节点下,所以同一个流的所有消息也会位于同一个节点下。
由于同一个Stream(Key)下的所有消息位于同一节点,类比Kafka分区更像是使用多个Key形成多个Stream来处理本质上是同一类消息的一个Stream,而不是Stream下的Consumer Group。
以上是关于Redis 内置了一个Kafka —— Stream的主要内容,如果未能解决你的问题,请参考以下文章