Redis 内置了一个Kafka —— Stream

Posted 360搜索技术团队

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis 内置了一个Kafka —— Stream相关的知识,希望对你有一定的参考价值。

流(Stream)是Redis从5.0.0版本新加入的一个数据结构,是一个类似于Kafka的消息系统。该结构相关的大部分命令使用字母 X开头 如 XADD, XLEN, XRANGE等。

在开始详细叙述之前,先说明一下:

本文内容主要是结合官网文章Introduction to Redis Streams 和个人理解整理而成。另外想吐槽下自己,游戏玩多了也不太好, Stream 总是不自觉的写成了 Steam

向Stream中添加数据

命令格式如

 
   
   
 
  1. XADD key ID field string [field string ...]

 
   
   
 
  1. > xadd foo * f1 1 f2 100

  2. "1541558444516-0"

返回的 1541558444516-0 是消息id。 可以看到上面的命令ID参数传的是 *, 代表由系统生成的ID,当然,你可以显式的指定消息ID(基本不太常用),对于这个ID有几点需要注意:

  • ID必须是由 - 连接的两部分,前一部分默认情况下是当前Server当前的毫秒时间戳,后一部分是一个无符号64位长整型序列号;

  • 同一个KEY下,后加入的ID一定要比已加入的ID大。

获取Stream长度

使用 XLEN, 如

 
   
   
 
  1. > xlen foo

  2. (integer) 1

从Stream中获取数据

有三种方式从Stream中获取数据

按照范围查询

包括 XRANGEXREVRANGE 两个命令 ,分别是正序和反序, 以正序 XRANGE为例:

 
   
   
 
  1. XRANGE key start end [COUNT count]

从Stream这个key中返回ID范围是 startend[前count个]数据。 如

 
   
   
 
  1. > xrange foo - +

  2. 1) 1) "1541558444516-0"

  3.   2) 1) "f1"

  4.      2) "1"

  5.      3) "f2"

  6.      4) "100"

  7. 2) 1) "1541559735247-0"

  8.   2) 1) "f1"

  9.      2) "2"

  10.      3) "f2"

  11.      4) "101"

  12. 3) 1) "1541559739871-0"

  13.   2) 1) "f1"

  14.      2) "3"

  15.      3) "f2"

  16.      4) "102"

-, +分别表示最小和最大ID。

监听(XREAD)

XREAD命令

 
   
   
 
  1. XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

 
   
   
 
  1. > xread COUNT 2 STREAMS foo 0

  2. 1) 1) "foo"

  3.   2) 1) 1) "1541558444516-0"

  4.         2) 1) "f1"

  5.            2) "1"

  6.            3) "f2"

  7.            4) "100"

  8.      2) 1) "1541559735247-0"

  9.         2) 1) "f1"

  10.            2) "2"

  11.            3) "f2"

  12.            4) "101"

执行完成后再执行:

 
   
   
 
  1. > xlen foo

  2. (integer) 3

发现Stream的长度没有变化,也就是说, XREAD 不会删除Stream里的数据。

上面的这个例子是一个非阻塞的方式监听。当使用 BLOCK参数,并传递一个超时时间(0为永不超时),将启动一个阻塞方式的监听。 特殊的ID $ 表示从最新的ID开始监听。如:

启动一个监听客户端:

 
   
   
 
  1. > xread block 0 STREAMS foo $

该命令阻塞等待, 此时另起一个客户端:

 
   
   
 
  1. > xadd foo * f1 5 f2 104

  2. "1541560822306-0"

等待的客户端收到消息:

 
   
   
 
  1. > xread COUNT 2 block 60000 STREAMS foo $

  2. 1) 1) "foo"

  3.   2) 1) 1) "1541560822306-0"

  4.         2) 1) "f1"

  5.            2) "5"

  6.            3) "f2"

  7.            4) "104"

  8. (18.24s)

如果有多个客户端都在监听同一个流,这些客户端都可以得到流中的数据。

消费者组 (Consumer Group)

机制说明

涉及三个命令 分别是

  • XGROUP: 创建或者销毁一个 Consumer Group, 也可以从Consumer Group中删除一个 Consumer

  • XREADGROUP: 指定 Consumer Group 中的一个Consumer,消费一条消息

  • XACK: 在 XREADGROUP调用时不指定 NOACK 时需要显式调用 XACK 命令 来确认该消息已被正确处理,可以删除。

消费者组的消费方式可以用下图表示

 
   
   
 
  1.                         |---> consumer1

  2.          |---> group1 --|

  3.          |              |---> consumer2

  4. stream ---|                                                

  5.          |              |---> consumer3

  6.          |---> group2 --|

  7.                         |---> consumer4

一个消息 msg 可通过 group1 和 group2 分发,并且 group1 中的 msg 会被 consumer1 或者 consumer2 消费,group2 中的 msg 会被 consumer3 或者 comsumer4 消费。

创建/消费/确认

使用 XGROUP命令创建一个consumer group,如

 
   
   
 
  1. > xgroup create foo foocg1 $

  2. OK

这样就创建了一个名为foocg1的 consumer group, 其中 $ 表示该组将要消费当前时间开始的消息,然后我们向Stream中添加一些消息:

 
   
   
 
  1. > xadd foo * abc 1

  2. "1541573729250-0"

  3. > xadd foo * abc 2

  4. "1541573730690-0"

  5. > xadd foo * abc 3

  6. "1541573732130-0"

  7. > xadd foo * abc 4

  8. "1541573733693-0"

  9. > xadd foo * abc 5

  10. "1541573735201-0"

  11. > xadd foo * abc 6

  12. "1541573736675-0"

  13. > xadd foo * abc 7

  14. "1541573738869-0"

此时,使用foocg1下的c1消费者来消费一条消息

 
   
   
 
  1. > xreadgroup group foocg1 c1 count 1 streams foo >

  2. 1) 1) "foo"

  3.   2) 1) 1) "1541573729250-0"

  4.         2) 1) "abc"

  5.            2) "1"

其中最后的ID字段 指定为 >, 表示只获取那些从来没有被分发的消息。

我们继续消费一条消息

 
   
   
 
  1. > xreadgroup group foocg1 c1 count 1 streams foo >

  2. 1) 1) "foo"

  3.   2) 1) 1) "1541573730690-0"

  4.         2) 1) "abc"

  5.            2) "2"

然后,再消费历史上所有的数据

 
   
   
 
  1. > xreadgroup group foocg1 c1 count 1 streams foo 0-0

  2. 1) 1) "foo"

  3.   2) 1) 1) "1541573729250-0"

  4.         2) 1) "abc"

  5.            2) "1"

注意这里ID传的是 0-0, 此时会发现消费的是第一条消息。也就是说,没有经过XACK的消息依旧会保留在队列中。

执行 XACK操作:

 
   
   
 
  1. > xack foo foocg1 1541573729250-0

  2. (integer) 1

此时再去消费历史数据

 
   
   
 
  1. > xreadgroup group foocg1 c1 count 1 streams foo 0-0

  2. 1) 1) "foo"

  3.   2) 1) 1) "1541573730690-0"

  4.         2) 1) "abc"

  5.            2) "2"

发现已经获取不到被 XACK的消息了,当所有的历史数据全部被 XACK 后:

 
   
   
 
  1. > xreadgroup group foocg1 c1 streams foo 0-0

  2. 1) 1) "foo"

  3.   2) (empty list or set)

一个伪码表示的客户端

一个消费者组的实现的伪码表示可以写作:

 
   
   
 
  1. client = 初始化客户端

  2. // 数据是否有效

  3. isHistory = true

  4. // 上一个数据的id

  5. lastid = "0-0"

  6. while true:

  7.    if isHistory:

  8.        id = lastid

  9.    else :

  10.        id = ">"

  11.    msgs = client.xreadgroup(从group中以阻塞的方式获取nlastid以后的消息)

  12.    if msgs == null :

  13.        // 此时在过期时间内没有获取到数据

  14.        continue

  15.    if msgs[0][1].length == 0 :

  16.        // 此时旧的消息已经被消费完

  17.        isHistory = false

  18.    foreach msgs as msg:

  19.        id = msg.id

  20.        // 处理msg

  21.        dealwith(msg)

  22.        // 发ACK

  23.        client.xack(group_id, id)

  24.        lastid = id

XPENDING 和 XCLAIM

XPENDING 可以获取消息系统中已经分发但是未被 XACK的消息的情况

 
   
   
 
  1. XPENDING key group [start end count] [consumer]

如:

 
   
   
 
  1. > xpending foo foocg1

  2. 1) (integer) 8

  3. 2) "1541573732130-0"

  4. 3) "1541581755413-0"

  5. 4) 1) 1) "c1"

  6.      2) "8"

表示有8个未确认消息,最小ID是"1541573732130-0",最大ID是"1541581755413-0", 其中c1 消费者有8个未确认的消息。传递start, end, count参数可以获取指定范围指定数目的未确认消息的详细信息,传递consumer可获取指定consumer下未确认信息列表。

XCLAIM 可以将未被确认的消息重新声明给其他消费者

 
   
   
 
  1. XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]

如下面命令可以获取到一条原属于c1的消息未被确认:

 
   
   
 
  1. > xpending foo foocg1 - + 10

  2. ...

  3. 7) 1) "1541581753377-0"

  4.   2) "c1"

  5.   3) (integer) 30175

  6.   4) (integer) 1

  7. ...

下面命令可以将将原本属于 消费者c1 的消息 1541581753377-0 在等待确认的时间>30000情况下重新声明给c2

 
   
   
 
  1. > xclaim foo foocg1 c2 30000 1541581753377-0

  2. 1) 1) "1541581753377-0"

  3.   2) 1) "abc"

  4.      2) "9"

此时

 
   
   
 
  1. > xpending foo foocg1 - + 10

  2. ...

  3. 7) 1) "1541581753377-0"

  4.   2) "c2"

  5.   3) (integer) 8552

  6.   4) (integer) 1

  7. ...

XPENDINGXCLAIM可以用来处理当一个消费者获取到一个消息后,运行失败导致无法执行 XACK,此时这个消息就永远不会进行确认已消费操作的情形。

其他命令

  • XINFO 可以查看流的一些信息

  • XTRIM 可以获得一个有长度上限的Stream

  • XDEL 可以从Stream中删除消息

这些命令可以在官网找到详细的说明,这里就不再赘述了。

其他说明

  1. Stream支持AOF和RDB格式的持久化

  2. 当调用XDEL等造成Stream长度为0时,为了保留可能存在的Consumer Group信息,Stream不会被删除。

  3. Redis Cluster场景下,由于Key存在于单节点下,所以同一个流的所有消息也会位于同一个节点下。

  4. 由于同一个Stream(Key)下的所有消息位于同一节点,类比Kafka分区更像是使用多个Key形成多个Stream来处理本质上是同一类消息的一个Stream,而不是Stream下的Consumer Group。


以上是关于Redis 内置了一个Kafka —— Stream的主要内容,如果未能解决你的问题,请参考以下文章

redis系列-开篇

redis代替kafka做缓存队列

kafka内置的zookeeper

挑战Kafka!Redis5.0重量级特性Stream尝鲜

我可以设置 Kafka Stream 消费者 group.id 吗?

Kafka性能篇:为何Kafka这么"快"?