6Redis高级特性(发布和订阅Stream)
Posted *King*
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了6Redis高级特性(发布和订阅Stream)相关的知识,希望对你有一定的参考价值。
一、发布和订阅
1、定义
Redis提供了“发布、订阅”模式的消息机制,其中消息订阅者与发布者不直接通信,发布者向指定的频道(channel)发布消息,订阅该频道的每个客户端都可以接收到消息
2、操作命令
发布消息
publish channel message
返回值是热闹收到信息的订阅者数量
订阅消息
subscribe channel [channel ...]
订阅者可以订阅一个或多个频道,如果此时另一个客户端发布一条消息,当前订阅者客户端会收到消息。
如果有多个客户端同时订阅了同一个频道,都会收到消息
此时生产者再发布一条消息
两个订阅者都能收到消息
需要注意的是:客户端在执行订阅命令之后进入了订阅状态,只能接收subscribe、
psubscribe,unsubscribe、 punsubscribe 的四个命令。
查看频道订阅数
pubsub numsub channel
查看模式订阅数
pubsub channels
取消订阅
unsubscribe [channel [channel ...]]
按模式订阅和取消订阅
psubscribe ch*
punsubscribe ch*
3、使用声景
需要消息解耦又并不关注消息可靠性的地方都可以使用发布订阅模式。
- 今日头条订阅号、微信订阅公众号、新浪微博关注、邮件订阅系统
- 即使通信系统
- 群聊部落系统(微信群)
4、缺点
PubSub 的生产者传递过来一个消息,Redis 会直接找到相应的消费者传递过去。如果一个消费者都没有,那么消息直接丢弃。如果开始有三个消费者,一 个消费者突然挂掉了,生产者会继续发送消息,另外两个消费者可以持续收到消息。但是挂掉的消费者重新连上的时候,这断连期间生产者发送的消息,对于这个消费者来说就是彻底丢失了。
Redis 的发布订阅很粗糙,因此无法实现消息堆积和回溯。
5、基于pub/sub实现消息队列
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
/*
* 基于PUBSUB的消息中间件的实现
* */
@Component
public class PSVer extends JedisPubSub
public final static String RS_PS_MQ_NS = "rpsm:";
@Autowired
private JedisPool jedisPool;
@Override
public void onMessage(String channel, String message)
System.out.println("Accept "+channel+" message:"+message);
@Override
public void onSubscribe(String channel, int subscribedChannels)
System.out.println("Subscribe "+channel+" count:"+subscribedChannels);
public void pub(String channel, String message)
Jedis jedis = null;
try
jedis = jedisPool.getResource();
jedis.publish(RS_PS_MQ_NS+channel,message);
System.out.println("发布消息到"+RS_PS_MQ_NS+channel+" message="+message);
catch (Exception e)
throw new RuntimeException("发布消息失败!");
finally
jedis.close();
public void sub(String... channels)
Jedis jedis = null;
try
jedis = jedisPool.getResource();
jedis.subscribe(this,channels);
catch (Exception e)
throw new RuntimeException("订阅频道失败!");
finally
jedis.close();
测试类
@SpringBootTest
public class TestPSVer
@Autowired
private PSVer psVer;
@Test
void testSub()
psVer.sub(PSVer.RS_PS_MQ_NS+"psmq", PSVer.RS_PS_MQ_NS+"psmq2");
@Test
void testPub()
psVer.pub("psmq","msgtest");
psVer.pub("psmq2","msgtest2");
测试结果
二、Stream
1、定义
Redis5.0最大的新特性。Stream支持多播的可持久化的消息队列
Redis Stream 的结构如上图所示,每一个Stream都有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的,Redis 重启后,内容还在。 每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用xadd指令追加消息时自动创建。
每个 Stream 都可以挂多个消费组,每个消费组会有个游标last_delivered_id在Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从Stream 的某个消息 ID 开始消费,这个 ID用来初始化last_delivered_id变量。
每个消费组 (Consumer Group)的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。
同一个消费组 (Consumer Group) 可以挂接多个消费者 (Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往前移动。每个消费者有一个组内唯一名称。
消费者 (Consumer)内部会有个状态变量 pending_ids,它记录了当前已经被客户端读取,但是还没有ack 的消息。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL,也就是 Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。
消息 ID 的形式是 timestampInMillis-sequence,例如 1527846880572-5,它表示当前的消息在毫米时间戳 1527846880572 时产生,并且是该毫秒内产生的第5 条消息。消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的 ID 要大于前面的消息 ID。
2、操作命令
生产端
xadd 追加消息
xdel 删除消息,这里的删除仅仅是设置了标志位,不会实际删除消息。
xrange 获取消息列表,会自动过滤已经删除的消息
xlen 消息长度
del 删除 Stream
* 号表示服务器自动生成 ID,后面顺序跟着一堆 key/value
xrange streamtest - +
其中-表示最小值 , + 表示最大值
也可以指定消息ID的列表
xdel streamtest 1632315706849-0
xlen streamtest
del streamtest //删除整个Stream
消费端
虽然 Stream 中有消费者组的概念,但是可以在不定义消费组的情况下进行Stream 消息的独立消费,当 Stream 没有新消息时,甚至可以阻塞等待。Redis 设计了一个单独的消费指令 xread,可以将 Stream当成普通的消息队列 (list) 来使用。
从 Stream 头部读取 1 条消息,0-0 指从头开始
xread count 1 streams streamtest 0-0
生产端生产消息
消费端消费消息
指定从 streams 的消息 Id 开始(不包括命令中的消息 id)
xread count 2 streams streamtest 1632359197345-0
$代表从尾部读取,上面的意思就是从尾部读取最新的一条消息,此时默认不返回任何消息
xread count 1 streams streamtest $
以阻塞的方式读取尾部最新的一条消息,直到新的消息的到来。block 后面的数字代表阻塞时间,单位毫秒
xread block 0 count 1 streams streamtest $
消费组
创建消费组,0-0表示从头开始消费
xgroup create streamtest cg1 0-0
$ 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略
xgroup create streamtest cg2 $
xinfo命令可查看stream2的情况
xinfo stream streamtest
查看消费组的情况
xinfo groups streamtest
消息消费
xreadgroup GROUP cg1 c1 count 1 streams streamtest >
> 号表示从当前消费组的 last_delivered_id 后面开始读,每当消费者读取一条消息,last_delivered_id 变量就会前进
设置阻塞等待
xreadgroup GROUP cg1 c1 block 0 count 1 streams streamtest >
如果同一个消费组有多个消费者,可通过xinfo consumers 指令观察每个消费者的状态
xinfo consumers streamtest cg1
确认消息
xack streamtest cg1 1626751586744-0
3、基于Stream实现消息队列
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.*;
import redis.clients.jedis.params.XReadGroupParams;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 实现消费组消费,不考虑单消费者模式
*/
@Component
public class StreamVer
public final static String RS_STREAM_MQ_NS = "rsm:";
@Autowired
private JedisPool jedisPool;
/**
* 发布消息到Stream
* @param key
* @param message
* @return
*/
public StreamEntryID produce(String key,Map<String,String> message)
Jedis jedis = null;
try
jedis = jedisPool.getResource();
StreamEntryID id = jedis.xadd(RS_STREAM_MQ_NS+key, StreamEntryID.NEW_ENTRY, message);
System.out.println("发布消息到"+RS_STREAM_MQ_NS+key+" 返回消息id="+id.toString());
return id;
catch (Exception e)
throw new RuntimeException("发布消息失败!");
finally
jedis.close();
/**
* 创建消费群组,消费群组不可重复创建
* @param key
* @param groupName
* @param lastDeliveredId
*/
public void createCustomGroup(String key, String groupName, String lastDeliveredId)
Jedis jedis = null;
try
StreamEntryID id = null;
if (lastDeliveredId==null)
lastDeliveredId = "0-0";
id = new StreamEntryID(lastDeliveredId);
jedis = jedisPool.getResource();
/*makeStream表示没有时是否自动创建stream,但是如果有,再自动创建会异常*/
jedis.xgroupCreate(RS_STREAM_MQ_NS+key,groupName,id,false);
System.out.println("创建消费群组成功:"+groupName);
catch (Exception e)
throw new RuntimeException("创建消费群组失败!",e);
finally
jedis.close();
/**
* 消息消费
* @param key
* @param customerName
* @param groupName
* @return
*/
public List<Map.Entry<String, List<StreamEntry>>> consume(String key, String customerName,String groupName)
Jedis jedis = null;
try
jedis = jedisPool.getResource();
/*消息消费时的参数*/
XReadGroupParams xReadGroupParams = new XReadGroupParams().block(0).count(1);
Map<String, StreamEntryID> streams = new HashMap<>();
streams.put(RS_STREAM_MQ_NS+key,StreamEntryID.UNRECEIVED_ENTRY);
List<Map.Entry<String, List<StreamEntry>>> result
= jedis.xreadGroup(groupName, customerName, xReadGroupParams, streams);
System.out.println(groupName+"从"+RS_STREAM_MQ_NS+key+"接受消息, 返回消息:"+result);
return result;
catch (Exception e)
throw new RuntimeException("消息消费失败!",e);
finally
jedis.close();
/**
* 消息确认
* @param key
* @param groupName
* @param msgId
*/
public void ackMsg(String key, String groupName,StreamEntryID msgId)
if (msgId==null) throw new RuntimeException("msgId为空!");
Jedis jedis = null;
try
jedis = jedisPool.getResource();
System.out.println(jedis.xack(key,groupName,msgId));
System.out.println(RS_STREAM_MQ_NS+key+",消费群组"+groupName+" 消息已确认");
catch (Exception e)
throw new RuntimeException("消息确认失败!",e);
finally
jedis.close();
/*
检查消费者群组是否存在,辅助方法
* */
public boolean checkGroup(String key, String groupName)
Jedis jedis = null;
try
jedis = jedisPool.getResource();
List<StreamGroupInfo> xinfoGroupResult = jedis.xinfoGroup(RS_STREAM_MQ_NS+key);
for(StreamGroupInfo groupinfo : xinfoGroupResult)
if(groupName.equals(groupinfo.getName())) return true;
return false;
catch (Exception e)
throw new RuntimeException("检查消费群组失败!",e);
finally
jedis.close();
public final static int MQ_INFO_CONSUMER = 1;
public final static int MQ_INFO_GROUP = 2;
public final static int MQ_INFO_STREAM = 0;
/**
* 消息队列信息查看
* @param type
*/
public void MqInfo(int type,String key, String groupName)
Jedis jedis = null;
try
jedis = jedisPool.getResource();
if(type==MQ_INFO_CONSUMER)
List<StreamConsumersInfo> xinfoConsumersResult = jedis.xinfoConsumers(RS_STREAM_MQ_NS+key, groupName);
System.out.println(RS_STREAM_MQ_NS+key+" 消费者信息:" + xinfoConsumersResult);
for( StreamConsumersInfo consumersinfo : xinfoConsumersResult)
System.out.println("-ConsumerInfo:" + consumersinfo.getConsumerInfo());
System.out.println("--Name:" + consumersinfo.getName());
System.out.println("--Pending:" + consumersinfo.getPending());
System.out.println("--Idle:" + consumersinfo.getIdle());
else if (type==MQ_INFO_GROUP)
List<StreamGroupInfo> xinfoGroupResult = jedis.xinfoGroup(RS_STREAM_MQ_NS+key);
System.out.println(RS_STREAM_MQ_NS+key+"消费者群组信息:" + xinfoGroupResult);
for(StreamGroupInfo groupinfo : xinfoGroupResult)
System.out.println("-GroupInfo:" + groupinfo.getGroupInfo());
System.out.println("--Name:" + groupinfo.getName());
System.out.println("--Consumers:" + groupinfo.getConsumers());
System.out.println("--Pending:" + groupinfo.getPending());
System.out.println("--LastDeliveredId:" + groupinfo.getLastDeliveredId());
else
StreamInfo xinfoStreamResult = jedis.xinfoStream(RS_STREAM_MQ_NS+key);
System.out.println(RS_STREAM_MQ_NS+key+"队列信息:" + xinfoStreamResult);
System.out.println("-StreamInfo:" + xinfoStreamResult.getStreamInfo());
System.out.println("--Length:" + xinfoStreamResult.getLength());
System.out.println("--RadixTreeKeys:" + xinfoStreamResult.getRadixTreeKeys());
System.out.println("--RadixTreeNodes():" + xinfoStreamResult.getRadixTreeNodes());
System.out.println("--Groups:" + xinfoStreamResult.getGroups());
System.out.println("--LastGeneratedId:" + xinfoStreamResult.getLastGeneratedId());
System.out.println("--FirstEntry:" + xinfoStreamResult.getFirstEntry());
System.out.println("--LastEntry:" + xinfoStreamResult.getLastEntry());
catch (Exception e)
throw new RuntimeException("消息队列信息检索失败!",e);
finally
jedis.close();
测试类
import cn.enjoyedu.redis.redismq.StreamVer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import redis.clients.jedis.StreamEntry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@SpringBootTest
public class TestStreamVer
@Autowired
private StreamVer streamVer;
private final static String KEY_NAME = "testStream";
private final static String GROUP_NAME = "testgroup";
@Test
void testProduce()
Map<String,String> message = new HashMap<>();
message.put("name","King");
message.put("age","18");
streamVer.produce(KEY_NAME,new HashMap<>(message));
streamVer.MqInfo(StreamVer.MQ_INFO_STREAM,KEY_NAME,null);
streamVer.MqInfo(StreamVer.MQ_INFO_GROUP,KEY_NAME,null);
@Test
void testConsumer()
if (!streamVer.checkGroup(KEY_NAME,GROUP_NAME))
streamVer.createCustomGroup(KEY_NAME,GROUP_NAME,null);
List<Map.Entry<String, List<StreamEntry>>> results = streamVer.consume(KEY_NAME,"testUser",GROUP_NAME);
streamVer.MqInfo(StreamVer.MQ_INFO_GROUP,KEY_NAME,GROUP_NAME);
streamVer.MqInfo(StreamVer.MQ_INFO_CONSUMER,KEY_NAME,GROUP_NAME);
for(Map.Entry<String, List<StreamEntry>> result:results )
for(StreamEntry entry:result.getValue())
streamVer.ackMsg(KEY_NAME,GROUP_NAME,entry.getID());
streamVer.MqInfo(StreamVer.MQ_INFO_GROUP,KEY_NAME,GROUP_NAME);
streamVer.MqInfo(StreamVer.MQ_INFO_CONSUMER,KEY_NAME,GROUP_NAME);
@Test
void testAck()
streamVer.ackMsg(KEY_NAME,GROUP_NAME,null);
streamVer.MqInfo(StreamVer.MQ_INFO_GROUP,KEY_NAME,GROUP_NAME);
streamVer.MqInfo(StreamVer.MQ_INFO_CONSUMER,KEY_NAME,GROUP_NAME);
测试结果
生产者
消费者
消息确认
以上是关于6Redis高级特性(发布和订阅Stream)的主要内容,如果未能解决你的问题,请参考以下文章