通过 WebSocket 连接的 Spring SseEmitter

Posted

技术标签:

【中文标题】通过 WebSocket 连接的 Spring SseEmitter【英文标题】:Spring SseEmitter over WebSocket connection 【发布时间】:2016-03-11 07:52:49 【问题描述】:

我的目标是从前端向后端发送一个请求并接收多个响应。我使用 WebSocket 是因为响应非常频繁,而 WebSocket 似乎是最好的协议,并且 SseEmitter 从后端发送多个响应。

这是我的请求控制器:

@MessageMapping("/emitter")
@SendTo("/topic/response")
public SseEmitter output(RunData runData) throws Exception 
    SseEmitter emitter = new SseEmitter();
    new Thread(new Runnable() 

        @Override
        public void run() 
            try 
                RemoteHostController rhc = new RemoteHostController(runData);
                rhc.execute();
                while (rhc.getActiveCount() > 0) 
                    emitter.send(rhc.getAllOutput());
                    Thread.sleep(2000);
                

                 emitter.complete();
             catch (Exception ee) 
                ee.printStackTrace();
                emitter.completeWithError(ee);
            
        
    ).start();

    return emitter;

RemoteHostController 正在管理连接,getAllOutput 从主机返回输出。

前端应用程序正在运行非常简单的 index.html,它使用 Stomp 和 SockJS 连接到 websocket,将数据发送到服务器并生成

带有来自响应的数据的标签:

function connect() 
        var socket = new SockJS('http://localhost:8080/emitter');
        stompClient = Stomp.over(socket);
        stompClient.connect(, function(frame) 
            setConnected(true);
            console.log('Connected: ' + frame);
            stompClient.subscribe('/topic/response', function(greeting)
                showOutput(greeting.body);
            );
        );


function sendData() 
        var hostname = document.getElementById('hostname').value;
        var username = document.getElementById('username').value;
        var password = document.getElementById('password').value;
        var command = document.getElementById('command').value;
        stompClient.send("/app/emitter", , JSON.stringify( 'hostname': hostname,
                                                    'username': username,
                                                    'password': password,
                                                    'command': command));


function showOutput(message) 
        var response = document.getElementById('response');
        var p = document.createElement('p');
        p.style.wordWrap = 'break-word';
        p.appendChild(document.createTextNode(message));
        response.appendChild(p);

当我将数据发送到后端时,我得到的唯一响应是:

"timeout":null

这是 SseEmitter 超时字段,当我更改超时时,它将返回 "timeout":<timeout_value>

我可以在日志中看到 RemoteHostController 正在连接到主机并正确执行命令。

我做错了吗?还是WebSocket只支持一请求一响应通信?

【问题讨论】:

您看到超时字段的原因是因为您的控制器方法正在返回一个 SseEmitter 并且这就是发送到 SendTo 目的地的内容。 Websockets 可以异步发送,因此没有实际的请求/响应。我对 SSE 不熟悉,但您似乎需要在客户端使用 EventSource(请参阅 coderanch.com/t/658238/Spring/… 作为示例。我在我们编写的应用程序中使用了 WebSockets,您使用 SimpMessagingTemplate 在后端发送内容。如果时间允许,我可能会发布解决方案 我有一个SseEmitter 的示例,但它是通过EventSource 将流数据发送到此Project 中的客户端。您可以在那里查看 SseEmitter 代码。由于 Internet Explorer 不支持 EventSource,因此我稍后将移至 WebSocket。另外我认为我不需要SseEmitterWebSocket 一起使用,我认为它通过@Rob Baily 指出的EventSource 发送数据 【参考方案1】:

这是 WebSocket 和 SSE 的示例。如上所述,IE 浏览器不支持 SSE。为了完整起见,我尽可能多地添加。确保在使用 SeeEmitter 时没有使用 RestController,因为这将返回对象,这是我从上面描述中的猜测。

pom.xml

<dependencies>
    <!-- Spring boot framework -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
</dependencies>

Web 套接字配置:

@Configuration
@EnableWebSocketMessageBroker
public class ApplicationWebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer 

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) 
        super.configureMessageBroker(registry);
        registry.enableSimpleBroker("/topic");
    

    public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) 
        stompEndpointRegistry.addEndpoint("/socketrequest").withSockJS();
    

请求数据:

public class RequestData 
    private String string1;
    private String string2;
    // excluding getters and setters

Web 套接字控制器:

@Controller
public class WebSocketController 
   @Autowired
    SimpMessagingTemplate simpMessagingTemplate;

    @MessageMapping("/proces-s-request")
    void runWebSocket( RequestData requestData ) 
        new Thread(new RunProcess(requestData)).start();
    

    private class RunProcess implements Runnable 
        private RequestData requestData;

        RunProcess(RequestData requestData) 
            this.requestData = requestData;
        

        public void run() 
            simpMessagingTemplate.convertAndSend("/topic/response", requestData.getString1());
            simpMessagingTemplate.convertAndSend("/topic/response", requestData.getString2());
            simpMessagingTemplate.convertAndSend("/topic/response", "A third response via websocket");
        
    

Sse 控制器:

@Controller
public class SseController 

    @RequestMapping("/emitter")
    public SseEmitter runEmitter(@RequestParam(value = "string1") String string1,
                                 @RequestParam(value = "string2") String string2)
    
        SseEmitter sseEmitter = new SseEmitter();
        RequestData requestData = new RequestData();
        requestData.setString1(string1);
        requestData.setString2(string2);
        new Thread(new RunProcess(requestData,sseEmitter)).start();
        return sseEmitter;
    

    private class RunProcess implements Runnable 
        private RequestData requestData;
        private SseEmitter sseEmitter;

        RunProcess(RequestData requestData, SseEmitter sseEmitter) 
            this.requestData = requestData;
            this.sseEmitter = sseEmitter;
        

        public void run() 
            try 
                sseEmitter.send(requestData.getString1());
                sseEmitter.send(requestData.getString2());
                sseEmitter.send("A third response from SseEmitter");
                sseEmitter.complete();
             catch (IOException e) 
                e.printStackTrace();
                sseEmitter.completeWithError(e);
            
        
    


HTML代码:

    <script src="/javascript/sockjs-0.3.4.js"></script>
    <script src="/javascript/stomp.js"></script>

    <script type="text/javascript">
    var stompClient = null;

    function connect() 
        var socket = new SockJS('http://localhost:8085/socketrequest');
        stompClient = Stomp.over(socket);
        stompClient.connect(, function(frame) 
            console.log('Connected: ' + frame);
            stompClient.subscribe('/topic/response', function(message)
                showOutput(message.body);
            );
        );
    

    function doWebsocket() 
        stompClient.send("/proces-s-request", , JSON.stringify( 'string1': 'The first string', 'string2' : 'The second string' ));
    


    function doSse() 
        console.log("doSse");
        var rtUrl= '/emitter?string1=first string sse&string2=second string sse';
        var source = new EventSource(rtUrl);
        source.onmessage=function(event)
            showOutput(event.data)
        ;
    

    function showOutput(message) 
        var response = document.getElementById('response');
        var p = document.createElement('p');
        p.style.wordWrap = 'break-word';
        p.appendChild(document.createTextNode(message));
        response.appendChild(p);
    

    connect();

    </script>
    </head>

<div>
Starting page
</div>
<div>
    <button id="websocket" onclick="doWebsocket();">WebSocket</button>
    <button id="sse" onclick="doSse();">Server Side Events</button>
</div>
<div >
    Response:
    <p id="response"></p>
</div>

</html>

【讨论】:

以上是关于通过 WebSocket 连接的 Spring SseEmitter的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot配置WebSocket

spring对websocket的集成和使用

Spring WebFlux 反应式 WebSocket 防止连接关闭

java - 如何使用simplebroker或rabbitMQ和java spring在websocket中获取所有连接的用户

spring websocket断网服务器怎么响应

带有 Spring-boot 后端的 Flutter websocket