Spring WebFlux 反应式 WebSocket 防止连接关闭

Posted

技术标签:

【中文标题】Spring WebFlux 反应式 WebSocket 防止连接关闭【英文标题】:Spring WebFlux reactive WebSocket prevent connection closing 【发布时间】:2018-05-07 04:05:26 【问题描述】:

我正在使用 Spring WebFlux 为我的应用程序开发简单的聊天模块,后端使用 ReactiveMongoRepository,前端使用 Angular 4。我能够通过 WebSocketSession 接收数据,但是在从 db 流式传输所有消息之后,我想保持连接,以便我可以更新消息列表。谁能给我线索如何实现这一点,或者我可能遵循了错误的假设?

Java 后端负责 WebSocket,我的订阅者只记录当前状态,没有任何相关内容:

WebFlux 配置:

@Configuration
@EnableWebFlux
public class WebSocketConfig 

private final WebSocketHandler webSocketHandler;

@Autowired
public WebSocketConfig(WebSocketHandler webSocketHandler) 
    this.webSocketHandler = webSocketHandler;


@Bean
@Primary
public HandlerMapping webSocketMapping() 
    Map<String, Object> map = new HashMap<>();
    map.put("/websocket-messages", webSocketHandler);

    SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
    mapping.setOrder(10);
    mapping.setUrlMap(map);
    return mapping;


@Bean
public WebSocketHandlerAdapter handlerAdapter() 
    return new WebSocketHandlerAdapter();




WebSocketHandler 实现

@Component
public class MessageWebSocketHandler implements WebSocketHandler 

private final MessageRepository messageRepository;
private ObjectMapper mapper = new ObjectMapper();
private MessageSubscriber subscriber = new MessageSubscriber();

@Autowired
public MessageWebSocketHandler(MessageRepository messageRepository) 
    this.messageRepository = messageRepository;


@Override
    public Mono<Void> handle(WebSocketSession session) 
    session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .map(this::toMessage)
            .subscribe(subscriber::onNext, subscriber:: onError, subscriber::onComplete);
    return session.send(
            messageRepository.findAll()
                    .map(this::toJSON)
                    .map(session::textMessage));


private String toJSON(Message message) 
    try 
        return mapper.writeValueAsString(message);
     catch (JsonProcessingException e) 
        throw new RuntimeException(e);
    


private Message toMessage(String json) 
    try 
        return mapper.readValue(json, Message.class);
     catch (IOException e) 
        throw new RuntimeException("Invalid JSON:" + json, e);
    


和 MongoRepo

@Repository
public interface MessageRepository extends 
ReactiveMongoRepository<Message,String> 

前端处理:

@Injectable()
export class WebSocketService 
  private subject: Rx.Subject<MessageEvent>;

  constructor() 
  

  public connect(url): Rx.Subject<MessageEvent> 
    if (!this.subject) 
      this.subject = this.create(url);
      console.log('Successfully connected: ' + url);
    
    return this.subject;
  

  private create(url): Rx.Subject<MessageEvent> 
    const ws = new WebSocket(url);
    const observable = Rx.Observable.create(
      (obs: Rx.Observer<MessageEvent>) => 
        ws.onmessage = obs.next.bind(obs);
        ws.onerror = obs.error.bind(obs);
        ws.onclose = obs.complete.bind(obs);
        return ws.close.bind(ws);
      );
    const observer = 
      next: (data: Object) => 
        if (ws.readyState === WebSocket.OPEN) 
          ws.send(JSON.stringify(data));
        
      
    ;
    return Rx.Subject.create(observer, observable);
  

在其他服务中,我从响应中映射可观察到的类型

  constructor(private wsService: WebSocketService) 
    this.messages = <Subject<MessageEntity>>this.wsService
      .connect('ws://localhost:8081/websocket-messages')
      .map((response: MessageEvent): MessageEntity => 
        const data = JSON.parse(response.data);
        return new MessageEntity(data.id, data.user_id, data.username, data.message, data.links);
      );
  

最后订阅了由于连接关闭而无法使用的发送功能:

  ngOnInit() 
    this.messages = [];
    this._ws_subscription = this.chatService.messages.subscribe(
      (message: MessageEntity) => 
        this.messages.push(message);
      ,
      error2 => 
        console.log(error2.json());
      ,
      () => 
        console.log('Closed');
      
    );
  

  sendTestMessage() 
    this.chatService.messages.next(new MessageEntity(null, '59ca30ac87e77d0f38237739', 'mickl', 'test message angular', null));
  

【问题讨论】:

【参考方案1】:

假设您的聊天消息在接收时被保存到数据存储区,您可以使用 Spring Data MongoDB Reactive 中的可尾游标功能(请参阅reference documentation)。

因此,您可以在存储库中创建一个新方法,例如:

public interface MessageRepository extends ReactiveSortingRepository< Message, String> 

    @Tailable
    Flux<Message> findWithTailableCursor();

请注意,可尾游标有一些限制:您的 mongo 集合需要设置上限,并且条目按插入顺序流式传输。

Spring WebFlux websocket 支持尚不支持 STOMP 或消息代理,但这可能是此类用例的更好选择。

【讨论】:

好的,所以基本上只使用 WebFlux 是不可能保持连接以进行通知的,对吧?这对我来说是一个研究项目,所以有点令人失望,谢谢你的替代方案,我一定会去看看。 我没这么说。只要源Flux 保持打开状态,您就可以使连接保持打开状态。在您的情况下,存储库在完成流式传输条目时会发送 onComplete 信号。否则,您怎么知道数据库中是否有更多结果?你在这里寻找一个无限流用例,你需要一个像这样表现的源。 哦,好的,我现在明白了,我查看了文档,@Tailable 注释似乎是我正在寻找的答案。再次感谢,现在我可以继续了;D

以上是关于Spring WebFlux 反应式 WebSocket 防止连接关闭的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot 2 反应式 webflux

[转] 使用 Spring 5 的 WebFlux 开发反应式 Web 应用

使用 @EnableResourceServer 支持 Spring Boot 反应式 (webflux)

每日阅读2020年7月14日-使用 Spring 5 的 WebFlux 开发反应式 Web 应用

Spring webflux 反应式 Mono::subscribe

Spring WebFlux 教程:如何构建反应式 Web 应用程序