Webflux,使用Websocket如何防止订阅两次反应式redis消息操作

Posted

技术标签:

【中文标题】Webflux,使用Websocket如何防止订阅两次反应式redis消息操作【英文标题】:Webflux, with Websocket how to prevent subscribing twice of reactive redis messaging operation 【发布时间】:2021-12-31 04:31:50 【问题描述】:

我在 webflux 上有一个使用 redis 消息传递操作的 websocket 实现。它的作用是监听主题并通过 websocket 端点返回值。

我遇到的问题是每次用户通过 websocket 向端点发送消息时,似乎都进行了全新的 redis 订阅,导致 redis 消息主题上的订阅者积累,并且 websocket 响应随着数量的增加而增加redis 主题消息订阅次数也是如此(示例用户发送 3 条消息,redis 主题订阅增加到 3 次,websocket 连接响应 3 次)。

想知道是否有办法重用消息主题的相同订阅,这样可以防止多个 redis 主题订阅。

我使用的代码如下:

Websocket 处理程序

public class SendingMessageHandler implements WebSocketHandler 
  private final Gson gson = new Gson();

  private final MessagingService messagingService;

  public SendingMessageHandler(MessagingService messagingService) 
      this.messagingService = messagingService;
  

  @Override
  public Mono<Void> handle(WebSocketSession session) 
      Flux<WebSocketMessage> stringFlux = session.receive()
              .map(WebSocketMessage::getPayloadAsText)
              .flatMap(inputData ->
                      messagingService.playGame(inputData)
                              .map(data ->
                                      session.textMessage(gson.toJson(data))
                              )
              );

      return session.send(stringFlux);
  
 

消息处理服务

公共类消息服务 private final ReactiveRedisOperations reactiveRedisOperations;

  public MessagingService(ReactiveRedisOperations<String, GamePubSub> reactiveRedisOperations) 
      this.reactiveRedisOperations = reactiveRedisOperations;
  


  public Flux<Object> playGame(UserInput userInput)
      return reactiveRedisOperations.listenTo("TOPIC_NAME");
  

提前谢谢你。

【问题讨论】:

【参考方案1】:

而不是使用ReactiveRedisOperationsMessageListener 是这里的方式。您可以注册一次监听器,并将以下内容用作监听器。

data -> session.textMessage(gson.toJson(data))

注册应该只在连接开始时发生一次。您可以覆盖 SendingMessageHandlervoid afterConnectionEstablished(WebSocketSession session) 来完成此操作。这样,每个新的 Websocket 连接、每条消息都会创建一个新订阅。

另外,不要忘记覆盖afterConnectionClosed,并取消订阅redis主题,并清理其中的监听器。

Instructions on how to use MessageListener.

【讨论】:

以上是关于Webflux,使用Websocket如何防止订阅两次反应式redis消息操作的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Webflux 上打开 Websocket 时发送消息

如何将 Spring Webflux Websocket 路由作为注释?

将消息推送到谷歌云发布订阅后如何返回对 webflux 端点的响应?

Spring WebFlux 基础教程:WebSocket 使用

Spring WebFlux 和 WebSocket

如何在 Spring 5 webflux websocket 客户端上更改帧/缓冲区大小