基于Redis的发布/订阅模式实现者:RedisQ
Posted 街头小贩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Redis的发布/订阅模式实现者:RedisQ相关的知识,希望对你有一定的参考价值。
Redis中是存在发布/订阅功能的,哪这个github的项目是意欲何为!RedisQ项目的主页上作了以下说明:
What Redis offers with Pub/Sub is a listener model, where each subscriber receives each messages when it is listening, but won’t receive them when not connected. We want every consumer to eventually receive all messages, independently of their online or offline status.
In a clustered environment where you have multiple instances of your consumer component (application) running at the same time, each instance would receive each message produced on the channel. This library makes sure any given message is consumed once per logical consumer, even when multiple instances of this component are running.
根据官方的使用说明,几天实验下来的总结如下:
A: RedisMessageQueue
实例化后. 需要一个队列名称,Spring Bean:
@Bean("boardCreateQueue")
public RedisMessageQueue boardCreateQueue()
RedisMessageQueue mq = new com.github.davidmarquis.redisq.RedisMessageQueue();
mq.setQueueName("core:BoardCreateEvent");
return mq;
注意:
RedisMessageQueue 并不是一个泛型类, 一个队列存储一种事件类型,不然事件会重发(一个队列存多种类型的事件)。
B: MessageProducer
一个泛型类, 类型参数为事件类型. 实例后需要设置队列实例, Spring Bean:
@Bean("boardCreateProducer")
public MessageProducer<BoardCreateEvent> boardCreateProducer(@Qualifier("boardCreateQueue") RedisMessageQueue boardCreateQueue)
DefaultMessageProducer<BoardCreateEvent> dmq = new com.github.davidmarquis.redisq.producer.DefaultMessageProducer<>();
dmq.setQueue(boardCreateQueue);
return dmq;
事件的发布由这个消息的生产者来实施, 业务层注入后如下调用:
@Service
public class BoardServiceImpl implements BoardService
@Autowired @Qualifier("boardCreateProducer")
private MessageProducer<BoardCreateEvent> boardCreateProducer;
//方法名
public Optional<Board> create(...)
//ETC
boardCreateProducer.create(new BoardCreateEvent(board)).submit();
注意: 一个队列存储一种类型的事件,一种类型的事件可以由多个生产者生产, 一个队列有一到多个消息者, 默认是多个若只能有一个消息者需要单独配置
C: MessageListener && MessageConsumer
MessageConsumer中需要一个MessageListener ,MessageListener 的实现类中可以处理事件。 MessageConsumer配置如下:
@Bean
public MessageConsumer<BoardCreateEvent> boardStatsEntryConsumer(
@Qualifier("boardCreateQueue") RedisMessageQueue boardCreateQueue,
@Qualifier("boardStatsEntry") BoardStatsListener boardStatsEntry)
MessageConsumer<BoardCreateEvent> messageConsumer = new com.github.davidmarquis.redisq.consumer.MessageConsumer<>();
messageConsumer.setQueue(boardCreateQueue);
messageConsumer.setConsumerId("BoardCreateEvent:statsEntry");
messageConsumer.setMessageListener(boardStatsEntry);
return messageConsumer;
BoardStatsListener 实现了MessageListener 这个泛型接口. MessageConsumer也支持多线程,官方有一个配置示例:
<bean id="messageConsumer" class="com.github.davidmarquis.redisq.consumer.MessageConsumer">
<property name="queue" ref="myQueue" />
<property name="consumerId" value="someConsumerId" />
<property name="messageListener" ref="messageListener"/>
<property name="retryStrategy">
<bean class="com.github.davidmarquis.redisq.consumer.retry.MaxRetriesStrategy">
<constructor-arg name="maxRetries" value="2"/>
</bean>
</property>
</bean>
注意:
MessageConsumer的ConsumerId不能重复, 以此来区分不同的订阅者. 项目启动后都会生成消费者的redis的key. 如下图:
若某队列存在多个订阅者如下图:
当一条消息由生产者(MessageProducer)发到队列以后会多一条redis记录代表消息(Message 一个泛型类), 消息的载体即为事件类。如下图
上图的Map由RedisOps.MessageConverter实现类的toMap方法生成. 项目默认使用DefaultMessageConverter的toMap方法,源码如下:
public Map<String, String> toMap(Message message, PayloadSerializer payloadSerializer)
String payloadAsText = payloadSerializer.serialize(message.getPayload());
Map<String, String> data = new HashMap<String, String>(4);
data.put(ID_HASH_KEY, message.getId());
data.put(CREATION_HASH_KEY, Long.toString(message.getCreation().getTime().getTime()));
data.put(PAYLOAD_HASH_KEY, payloadAsText);
data.put(RETRY_COUNT, Integer.toString(message.getRetryCount()));
if (message.getTimeToLiveSeconds() != null)
data.put(TTL_HASH_KEY, Long.toString(message.getTimeToLiveSeconds()));
return data;
可见载荷(payload)的内容是由PayloadSerializer 的实现类来作主要参与方. 这里提及主要因为官方默认使用的是:JaxbPayloadSerializer,若无法完成转换还有其它两种选择: GsonPayloadSerializer和StringPayloadSerializer。 若要变更只需要在spring中声明即可, 例:
@Bean
public PayloadSerializer payloadSerializer()
return new com.github.davidmarquis.redisq.serialization.GsonPayloadSerializer();
@Bean("redisOps")
public RedisOps buildRedisMQOps()
return new com.github.davidmarquis.redisq.persistence.RedisOps();
可能有人会有疑问?
1: 它怎么写入的?
使用RedisTemplate ! 所有的读和写都由RedisOps来托底. 而这个redisTemplate属性上有Autowired注解. 只要在Spring Bean工厂中存在即可. 所以不需要在buildRedisMQOps中set redisTemplate
2: PayloadSerializer 变更后会起作用吗?
会 ! 原理同上. 源码如下
public class RedisOps
@Autowired
private RedisTemplate redisTemplate;
@Autowired(required = false)
private PayloadSerializer payloadSerializer = new JaxbPayloadSerializer();
@Autowired(required = false)
private MessageConverter messageConverter = new DefaultMessageConverter();
3:参考示例
forum项目的jdk 11+版本, 使用SSJ模块化的小论坛项目
D: 总结
若你的项目使用了redis作缓存, 又需要一个轻量的MQ来进行消息传递,不需要单独跑一个MQ产品(ActiveMQ, Kafka, …), 这个项目是不错的选择. 目前发现一个最大的使用问题是配置太繁杂,若是基于spring的自动装配哪太好了. 希望哪位大神完成了此项工作能告诉我一声!项目也有3年多没更新了!
以上是关于基于Redis的发布/订阅模式实现者:RedisQ的主要内容,如果未能解决你的问题,请参考以下文章