Spring WebFlux:WebSocketSession.send() 不发送消息

Posted

技术标签:

【中文标题】Spring WebFlux:WebSocketSession.send() 不发送消息【英文标题】:Spring WebFlux: WebSocketSession.send() doesn't send messages 【发布时间】:2021-11-16 18:41:54 【问题描述】:

你好 我正在尝试使用 Spring WebFlux 创建 webscoket 端点。我希望这个端点返回一些事件。 为此,我创建了事件的 ConnectableFlux,并在 handle(..) 方法中将其映射到 Flux。但是在我把它交给 WebSocketSession 之后,什么都没有发生——websocket 客户端没有收到任何东西。但与此同时,您可以在下面我的 handle(..) 方法中看到的 println(event.toString()) 实际上将信息打印到控制台。 你能告诉我我错过了什么吗?

public class EventWebsocketHandler implements WebSocketHandler 

    //  constructors and etc.

    @Override
    public Mono<Void> handle(WebSocketSession session) 
        ObjectMapper objectMapper = new ObjectMapper();

        Flux<WebSocketMessage> messages = eventService.events()
                .flatMap(event -> 
                    try 
                        System.out.println(event.toString());
                        return Mono.just(objectMapper.writeValueAsString(event));
                     catch (JsonProcessingException e) 
                        return Mono.error(e);
                    
                )
                .map(session::textMessage);

        return session.send(messages);
    

@Service
public class EventService 

    List<EventDto> events = new ArrayList<>();

    private final Flux<EventDto> eventFlux = Flux.<EventDto>create(fluxSink -> 
        while (true) 
            if (!events.isEmpty()) 
                fluxSink.next(events.get(0));
                events.remove(0);
            
        
    )
            .publish()
            .autoConnect();

    public void push(EventDto event) 
        events.add(event);
    

    public Flux<EventDto> events() 
        return eventFlux;
    


我的项目中有另一个 WebSocketHandler,它工作正常,这意味着配置一切正常:


public class MyWebSocketHandler implements WebSocketHandler 

    @Override
    public Mono<Void> handle(WebSocketSession session) 
        Flux<Long> source = Flux.interval(Duration.ofMillis(1000 * 3));
        return session.send(source.map(l -> session.textMessage(String.valueOf(l))));
    


【问题讨论】:

【参考方案1】:

这个

    private final Flux<EventDto> eventFlux = Flux.<EventDto>create(fluxSink -> 
    while (true) 
        if (!events.isEmpty()) 
            fluxSink.next(events.get(0));
            events.remove(0);
        
    
)
        .publish()
        .autoConnect();

必须用这个替换

private final Sinks.Many<EventDto> processor = Sinks.many().multicast().onBackpressureBuffer();

【讨论】:

以上是关于Spring WebFlux:WebSocketSession.send() 不发送消息的主要内容,如果未能解决你的问题,请参考以下文章

Spring Webflux - 03 Webflux编程模型

Spring Webflux - 03 Webflux编程模型

Spring Webflux - 03 Webflux编程模型

Spring-WebFlux使用,一文带你从0开始学明白Spring-WebFlux,学明白响应式编程

Spring WebFlux

Spring @Async 与 Spring WebFlux