Redis基础 -- 流(stream)类型 和 流(stream)类型的常用命令

Posted CodeJiao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis基础 -- 流(stream)类型 和 流(stream)类型的常用命令相关的知识,希望对你有一定的参考价值。

文章目录

1. 流(stream)类型

流(stream)是Redis 5.0版本中新增加的数据结构,也是该版本最重要的更新。在以往的版本中,为了实现消息队列这一常见应用,用户往往会使用列表、有序集合和发布与订阅这3种功能,但这些不同的实现都有各自的缺陷:

  • 列表实现的消息队列虽然可以快速地将新消息追加到列表的末尾,但因为列表为线性结构,所以程序如果想要查找包含指定数据的元素,或者进行范围查找,就需要遍历整个列表。
  • 有序集合虽然可以有效地进行范围查找,但缺少列表和发布与订阅提供的阻塞弹出原语,这使得程序无法使用有序集合去实现可阻塞的消息弹出操作。
  • 发布与订阅虽然拥有将消息传递给多个客户端的能力,并且也拥有相应的阻塞弹出原语,但发布与订阅的 “发送即忘(fire and forget)” 策略会导致离线的客户端丢失消息,所以它是无法实现可靠的消息队列的。

无论是列表、有序集合还是发布与订阅,它们的元素都只能是单个值。换句话说,如果用户想要用这些数据结构实现的消息队列传递多项信息,那么必须使用JSON之类的序列化格式来将多项信息打包存储到单个元素中,然后再在取出元素之后进行相应的反序列化操作。

流是一个包含零个或任意多个流元素的有序队列,队列中的每个元素都包含一个ID和任意多个键值对,这些元素会根据ID的大小在流中有序地进行排列。

Redis流的出现解决了上述提到的所有问题,是使用Redis实现消息队列应用的最佳选择。


1.1 XADD:追加新元素到流的末尾

用户可以通过执行XADD命令,将一个带有指定ID以及包含指定键值对的元素追加到流的末尾。

语法:


XADD命令在成功执行时将返回新元素的ID作为结果。

如果给定的流不存在,那么Redis会先创建一个空白的流,然后将给定的元素追加到流中。

流中的每个元素可以包含一个或任意多个键值对,并且同一个流中的不同元素可以包含不同数量的键值对,比如其中一个元素可以包含3个键值对,而另一个元素则可以包含5个键值对,诸如此类。需要注意的是,与散列以无序方式存储键值对的做法不同,流元素会以有序方式存储用户给定的键值对:用户在创建元素时以什么顺序给定键值对,它们在被取出的时候就是什么顺序。


1.1.1 流元素的ID

流元素的ID由毫秒时间(millisecond)和顺序编号(sequcen number)两部分组成,其中使用UNIX时间戳表示的毫秒时间用于标识与元素相关联的时间,而以0为起始值的顺序编号则用于区分同一时间内产生的多个不同元素。因为毫秒时间和顺序编号都使用64位的非负整数表示,所以整个流ID的总长为128位,而Redis在接受流ID输入以及展示流ID的时候都会使用连字符-分割这两个部分。

示例:

下图展示了一个流元素ID示例。在这个示例中,元素ID的毫秒时间部分为1100000000000,而顺序编号部分则为12345。如果我们使用这个ID作为输入调用以下XADD命令,那么Redis将把包含键值对k1和v1的新元素追加到流s1的末尾:


1.1.2 不完整的流ID

用户在输入流ID的时候,除了可以给出带有毫秒时间和顺序编号的完整流ID之外,还可以给出只包含毫秒时间的不完整流ID:在这种情况下,Redis会自动将ID的顺序编号部分设置为0。

示例:


1.1.3 流元素ID的限制

因为同一个流中的每个元素ID都用于指定特定的一个元素,所以这些ID必须是各不相同的,换句话说,同一个流中的不同元素是不允许使用相同ID的。
除了不允许使用相同的ID之外,Redis还要求新元素的ID必须比流中所有已有元素的ID都要大。具体来说,Redis会记住每个流已有元素的最大ID,并在用户尝试向流里面添加新元素的时候,使用新元素的ID与流目前最大的ID进行对比:

  • 如果新ID的毫秒时间部分比最大ID的毫秒时间部分要大,那么允许添加新元素。
  • 如果新ID的毫秒时间部分与最大ID的毫秒时间部分相同,那么对比两个ID的顺序编号部分,如果新ID的顺序编号部分比最大ID的顺序编号部分要大,那么允许添加新元素。
  • 相反,不符合上述两种情况的添加操作将会被拒绝,并返回一个错误。

示例:

如果我们在流s1已经拥有ID为1100000000000-12345的元素的情况下,尝试添加ID为1000000000000-12345的元素或者ID为1100000000000-100的元素,那么Redis将返回一个错误:

与此相反,如果我们向流中添加ID为1200000000000-0的新元素,那么Redis将正确执行命令,因为新给定的ID比之前的最大ID 1100000000000-12345要大:

在成功执行XADD命令之后,流的最大元素ID也会随之更新。比如,在执行上述命令之后,流s1的最大元素ID就从原来的1100000000000-12345更新到了1200000000000-0。

只执行追加操作的流:

通过将元素ID与时间进行关联,并强制要求新元素的ID必须大于旧元素的ID,Redis从逻辑上将流变成了一种只执行追加操作(append only)的数据结构,这种特性对于使用流实现消息队列和事件系统的用户来说是非常重要的:用户可以确信,新的消息和事件只会出现在已有消息和事件之后,就像现实世界里新事件总是发生在已有事件之后一样,一切都是有序进行的。

此外,只能将新元素添加到末尾而不允许在数据结构的“中间”添加新元素,这也是流与列表以及有序集合之间的一个显著区别。


1.1.4 自动生成元素ID

Redis流对元素ID的要求非常严格,并且还会拒绝不符合规则的ID。为了方便用户执行添加操作,Redis为XADD命令的id参数设定了一个特殊值:当用户将符号*用作id参数的值时,Redis将自动为新添加的元素生成一个可用的新ID。

具体来说,自动生成的新ID会将Redis所在宿主机器当前毫秒格式的UNIX时间戳用作ID的毫秒时间,并根据当前已有ID的最大顺序编号来设置新ID的顺序编号部分。

  • 如果在当前毫秒之内还没有出现过任何ID,那么新ID的顺序编号将被设置为0。比如,如果当前时间为1530200000000ms,并且在这一毫秒之内没有任何现存的ID,那么新元素的ID将被设置为1530200000000-0。
  • 如果在当前毫秒内已经存在其他ID,那么这些ID中顺序编号最大的那个加上1就是新ID的顺序编号。比如,如果在1530200000000ms内,现存最大的顺序编号为10086,那么新ID的顺序编号将是10086加1的结果10087,而完整的新ID则是1530200000000-10087。

示例:


1.1.5 限制流的长度

XADD命令提供了MAXLEN选项,让用户可以在添加新元素的同时删除旧元素,以此来限制流的长度。

在将新元素追加到流的末尾之后,XADD命令就会按照MAXLEN选项指定的长度,按照先进先出规则移除超出长度限制的元素。

示例:

对于下图所示的流来说:

如果我们执行以下命令:

执行后:


1.1.6 时间复杂度说明

复杂度:O(log (N)),其中N为流目前包含的元素数量。


1.2 XTRIM:对流进行修剪

用户除了可以在执行XADD命令的同时使用MAXLEN命令对流进行修剪之外,还可以通过执行XTRIM命令直接将流修剪至指定长度。

语法:

XTRIM命令在执行之后会返回被移除元素的数量作为结果。

示例:

对于下图所示的流来说:

如果我们执行以下命令:

执行后:

XTRIM命令与带有MAXLEN选项的XADD命令一样,都是根据先进先出规则来淘汰旧元素的。

说明:

复杂度:O(log (N) + M),其中N为执行修剪操作前流包含的元素数量,而M则为被移除元素的数量。


1.3 XDEL:移除指定元素

XDEL命令接受一个流以及任意多个元素ID作为输入,并从流中移除ID对应的元素。

语法:

XDEL命令在成功执行之后将返回被移除元素的数量作为结果。

示例:

对于下图所示的流来说:

如果我们执行以下命令:

执行后:

说明:

复杂度:O(log (N)*M),其中N为流包含的元素数量,而M则为被移除元素的数量。


1.4 XLEN:获取流包含的元素数量

用户可以通过对流执行XLEN命令,获取流目前包含的元素数量。

语法:


如果给定的流没有包含任何元素,或者流并不存在,那么XLEN命令将返回0作为结果。

示例:

对于下图所示的流来说:

如果我们执行以下命令:

说明:

复杂度:O(1)。


1.5 XRANGE、XREVRANGE:访问流中元素

流本质上是一个有序序列,对于这种序列,使用有序方式获取序列中的各项元素是一种非常常见的操作。正如Redis为另一种有序序列数据结构列表提供了LRANGE命令一样,Redis也为流提供了XRANGE命令,这个命令可以以遍历或者迭代的方式,访问流中的单个或者任意多个元素。

语法:

说明:

下面展示的命令都以该流为基础


1.5.1 获取ID指定的单个元素

XRANGE命令最简单的用法就是将命令的起始ID和结束ID设置为同一个流元素ID,这样XRANGE命令就会从流中获取并返回ID指定的元素。

示例:

获取s1流中id为1651732210821-0的元素:

如果用户给定的ID不存在,那么XRANGE命令将返回一个空列表作为结果:


1.5.2 获取指定ID范围内的多个元素

XRANGE命令除了可以用于获取单个元素之外,还可以用于获取多个元素:用户只需要将一个较小的元素ID设置为命令的起始ID,并将一个较大的元素ID设置为命令的结束ID,那么XRANGE命令就会从流中获取从起始ID到结束ID区间范围内的所有元素。

示例:

获取流中ID从1651732213026-0开始到1651732763727-0在内的所有元素:


1.5.3 获取所有元素

XRANGE命令的起始ID和结束ID除了可以是流元素的ID之外,还可以是特殊值减号-和加号+,其中前者用于表示流中的最小ID,而后者则用于表示流中的最大ID。在这两个特殊值的帮助下,用户可以通过XRANGE命令获取流包含的所有元素,或者ID大于等于或小于等于指定ID的所有元素。

示例:

通过执行以下命令,我们可以获取流s1包含的所有元素:

也通过执行以下命令,获取流s1中ID大于等于1651732216023-0的所有元素:
还可以通过执行以下命令,获取流temp-stream中ID小于等于1651732216023-0的所有元素:


1.5.4 获取指定数量的元素

虽然使用XRANGE命令可以很简单地获取多个元素,但是对于一些长度非常大的流来说,一次返回数量众多的元素可能会导致客户端因为体积庞大的回复而被阻塞,这并不是一件好事。

用户可以通过XRANGE命令的COUNT选项去限制一次命令调用能够返回的最大元素数量:

示例:

通过执行以下命令,我们可以只获取s1流中ID最小的3个元素:

又或者只获取ID大于等于1651732210821-0的前两个元素:


1.5.5 对流进行迭代

用户可以通过XRANGE命令对流进行迭代,具体步骤如下:

  1. 使用-作为起始ID,+作为结束ID,调用带有COUNT选项的XRANGE命令,获取流的前N个元素。
  2. 对于命令返回的最后一个元素,将该元素ID的顺序部分加1,得到一个新ID。
  3. 使用新ID作为起始ID, +作为结束ID,继续调用带有COUNT选项的XRANGE命令。
  4. 重复步骤2和3,直到XRANGE命令返回空列表为止,返回空列表表示整个流已经被迭代完毕。

示例:

对于下图所示的流来说,我们可以通过执行以下命令,以-为起始ID, +为结束ID,并将COUNT选项的值设置为3,以此来获取dense-stream流最开始的3个元素:


因为这次迭代获得的最后一个元素ID为1000000000000-2,所以我们只需要将这个ID的顺序编号加上1,就可以得到下一次迭代的起始ID 1000000000000-3了。与上一次迭代一样,命令接受的结束ID也是特殊值+:

第二次迭代返回的最后一个元素的ID为2000000000000-1,我们再次将这个ID的顺序编号加上1,然后将这个新ID 2000000000000-2用作起始ID,再次执行迭代操作:

跟之前一样,第三次迭代也获得了ID为4000000000000-1的最后元素,我们将这个ID的顺序编号加上1,得到新ID 4000000000000-2,然后再次进行迭代:
与之前3次不一样,XRANGE命令这次返回了一个空列表,这表明整个流已经被迭代完毕,迭代到此结束。下图展示了上述整个迭代流程。


1.5.6 以逆序访问流中元素

XREVRANGE命令是XRANGE命令的逆序版本,前者除了会按照ID从大到小而不是从小到大的顺序访问流元素之外,其他作用后者是相同的。

语法:


另外需要注意的是,XREVRANGE命令先接受结束ID,后接受起始ID,这种做法跟XRANGE命令正好相同。

示例:

获取流s1中ID最大的3个元素:


1.5.7 时间复杂度说明

复杂度:O(log (N) + M),其中N为流包含的元素数量,而M则为命令返回的元素数量。


1.6 XREAD:以阻塞或非阻塞方式获取流元素

除了XRANGE命令和XREVRANGE命令之外,Redis还提供了XREAD命令用于获取流中元素。

语法:

与XRANGE命令和XREVRANGE命令可以从两个方向对流进行迭代不同,XREAD命令只能从一个方向对流进行迭代,但是它提供了更简单的迭代API,支持同时对多个流进行迭代,并且能够以阻塞和非阻塞两种方式执行。

说明:

下面演示的案例都基于s1,s2,s3流。


1.6.1 从多个流中获取大于指定ID的元素

XREAD命令最基础的用法就是从多个给定流中取出大于指定ID的多个元素,其中紧跟在STREAMS选项之后的就是流的名字以及与之相对应的元素ID。

在调用XREAD命令时,用户需要先给定所有想要从中获取元素的流,然后再给出与各个流相对应的ID。除此之外,用户还可以通过可选的COUNT选项限制命令对于每个流最多可以返回多少个元素。

注意,我们把COUNT选项放在了STREAMS选项的前面,这是因为STREAMS选项是一个可变参数选项,它接受的参数数量是不固定的,所以它必须是XREAD命令的最后一个选项。

示例:

以下代码展示了如何从流s1中取出最多三个ID大于1651736697161-0的元素:

这个命令调用返回了一个只包含单个项的列表,而这个列表项又包含了两个子项,其中:

  • 第一个子项"s1"表明了这个列表项中的元素都是从流s1里面获取的。
  • 第二个子项包含了三个列表项,其中每一个列表项都是一个流元素。

与此类似,以下代码展示了如何从流s1、s2和s3中各取出一个ID大于1651736697161-0的元素:

如果用户尝试使用XREAD命令去获取一个不存在的流,或者给定的ID超过了流中已有元素的最大ID,那么命令将返回一个空值(nil)作为结果。


1.6.2 迭代流

与XRANGE命令和XREVRANGE命令类似,通过XREAD命令,我们同样可以对一个或多个流进行迭代,具体方法如下:

  • 将表示流起点的特殊ID 0-0(或者它的简写0)作为ID传入XREAD命令,并通过COUNT选项读取流最开头的N个元素。
  • 使用命令返回的最后一个元素的ID作为参数,再次调用带有COUNT选项的XREAD命令。
  • 重复执行步骤2,直到命令返回空值或者命令返回元素的数量少于指定数量为止。

示例:

假设我们想要以两个元素为步进迭代流s1,那么首先需要执行以下命令:

这个命令会从流s1里面取出位于流最开始的两个元素,它们的ID分别为1651736697161-0和1651736703259-0。为了进行下一次迭代,我们需要将后一个ID用作参数,继续调用XREAD命令:

与之前一样,这次的调用也返回了两个流元素。为了继续进行第三次迭代,我们需要将ID 1651736709596-0用作参数继续调用XREAD命令:

注意,与之前两次迭代都返回了两个元素不一样,虽然这次迭代也请求获取两个元素,但命令却只返回了一个元素,这表明对整个流的迭代已经完成了。

对比3个迭代命令:


1.6.3 阻塞

通过使用BLOCK选项并给定一个毫秒精度的超时时间作为参数,用户能够以可阻塞的方式执行XREAD命令。

BLOCK选项的值可以是任何大于等于0的数值,给定0则表示阻塞直到出现可返回的元素为止。给定大于0的参数表示最多等待参数毫秒,超过时间没有可获取的元素会返回nil。

示例:


1.6.4 只获取新出现的元素

Redis为XREAD命令提供了特殊ID参数$符号,用户在执行阻塞式的XREAD命令时,只要将$符号用作ID参数的值,XREAD命令就会只获取给定流在命令执行之后新出现的元素。

示例:

假设我们现在想要获取流s1接下来将要出现的第一个新元素,那么可以执行以下命令,执行这个调用的客户端将进入阻塞状态。在此之后,如果在给定的时限内,有另一个客户端向流s1推入新元素,那么原客户端的阻塞状态就会被解除,并返回被推入的元素,就像这样:



1.6.5 时间复杂度说明

复杂度:对于用户给定的每个流,获取流元素的复杂度为O(log (N) + M),其中N为流包含的元素数量,M为被获取的元素数量。因此对于用户给定的I个流,获取流元素的总复杂度为O((log (N) + M)*I)。



以上是关于Redis基础 -- 流(stream)类型 和 流(stream)类型的常用命令的主要内容,如果未能解决你的问题,请参考以下文章

Java基础 | Stream流原理与用法总结

Node篇

.NET基础拾遗字符串集合和流3

Stream 流操作

Java基础之Stream流(JDK1.8新特性)

流--Stream