Webflux,使用Websocket如何防止订阅两次反应式redis消息操作
Posted
技术标签:
【中文标题】Webflux,使用Websocket如何防止订阅两次反应式redis消息操作【英文标题】:Webflux, with Websocket how to prevent subscribing twice of reactive redis messaging operation 【发布时间】:2021-12-31 04:31:50 【问题描述】:我在 webflux 上有一个使用 redis 消息传递操作的 websocket 实现。它的作用是监听主题并通过 websocket 端点返回值。
我遇到的问题是每次用户通过 websocket 向端点发送消息时,似乎都进行了全新的 redis 订阅,导致 redis 消息主题上的订阅者积累,并且 websocket 响应随着数量的增加而增加redis 主题消息订阅次数也是如此(示例用户发送 3 条消息,redis 主题订阅增加到 3 次,websocket 连接响应 3 次)。
想知道是否有办法重用消息主题的相同订阅,这样可以防止多个 redis 主题订阅。
我使用的代码如下:
Websocket 处理程序
public class SendingMessageHandler implements WebSocketHandler
private final Gson gson = new Gson();
private final MessagingService messagingService;
public SendingMessageHandler(MessagingService messagingService)
this.messagingService = messagingService;
@Override
public Mono<Void> handle(WebSocketSession session)
Flux<WebSocketMessage> stringFlux = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap(inputData ->
messagingService.playGame(inputData)
.map(data ->
session.textMessage(gson.toJson(data))
)
);
return session.send(stringFlux);
消息处理服务
公共类消息服务
private final ReactiveRedisOperations
public MessagingService(ReactiveRedisOperations<String, GamePubSub> reactiveRedisOperations)
this.reactiveRedisOperations = reactiveRedisOperations;
public Flux<Object> playGame(UserInput userInput)
return reactiveRedisOperations.listenTo("TOPIC_NAME");
提前谢谢你。
【问题讨论】:
【参考方案1】:而不是使用ReactiveRedisOperations
,MessageListener
是这里的方式。您可以注册一次监听器,并将以下内容用作监听器。
data -> session.textMessage(gson.toJson(data))
注册应该只在连接开始时发生一次。您可以覆盖 SendingMessageHandler
的 void afterConnectionEstablished(WebSocketSession session)
来完成此操作。这样,每个新的 Websocket 连接、每条消息都会创建一个新订阅。
另外,不要忘记覆盖afterConnectionClosed
,并取消订阅redis主题,并清理其中的监听器。
Instructions on how to use MessageListener.
【讨论】:
以上是关于Webflux,使用Websocket如何防止订阅两次反应式redis消息操作的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Webflux 上打开 Websocket 时发送消息
如何将 Spring Webflux Websocket 路由作为注释?
将消息推送到谷歌云发布订阅后如何返回对 webflux 端点的响应?