SpringBoot/Spring中建立WebSocket连接(STOMP)

Posted zhenghuasheng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot/Spring中建立WebSocket连接(STOMP)相关的知识,希望对你有一定的参考价值。

STOMP协议介绍

STOMP,Streaming Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。

它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互,类似于OpenWire(一种二进制协议)。

由于其设计简单,很容易开发客户端,因此在多种语言和多种平台上得到广泛应用。其中最流行的STOMP消息代理是Apache ActiveMQ。

STOMP协议工作于TCP协议之上,使用了下列命令:

  • SEND 发送
  • SUBSCRIBE 订阅
  • UNSUBSCRIBE 退订
  • BEGIN 开始
  • COMMIT 提交
  • ABORT 取消
  • ACK 确认
  • DISCONNECT 断开

STOMP Over WebSocket:http://jmesnil.net/stomp-websocket/doc/

1,SpringBoot添加基于STMOP协议的WebSocket支持

1.1,添加pom依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

1.2,配置 websocket stomp

/**
 * 通过EnableWebSocketMessageBroker 开启使用STOMP协议来传输基于代理(message broker)的消息,此时浏览器支持使用@MessageMapping 就像支持@RequestMapping一样。
 * @author zhenghuasheng
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer

    /**
     * 注册stomp的端点
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) 
        registry.addEndpoint("/endpointService").setAllowedOrigins("*").withSockJS();
    

    /**
     * 配置消息代理(message broker)
     * @param registry
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) 
        // 订阅Broker名称
        registry.enableSimpleBroker("/queue","/topic");
        // 全局使用的消息前缀(客户端订阅路径上会体现出来)
//        registry.setApplicationDestinationPrefixes("/app");
        // 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
        // registry.setUserDestinationPrefix("/user/");
    

说明:

  • 用户订阅主题的前缀
registry.enableSimpleBroker("/queue","/topic");

/topic 代表发布广播,即群发
/queue 代表点对点,即发指定用户

  • 全局使用的消息前缀(客户端订阅路径上会体现出来)
registry.setApplicationDestinationPrefixes("/app");

例如客户端发送消息的目的地为/app/send,则对应控制层@MessageMapping(“/send”)
客户端订阅主题的目的地为/app/subscribe,则对应控制层@SubscribeMapping(“/subscribe”)

  • 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
registry.setUserDestinationPrefix("/user/");

1.3,消息实体类

  • 客户端发往服务器消息实体
/**
 * 客户端发往服务器消息实体
 * @author zhenghuasheng
 */
public class Message 
    private String name;

    public String getName() 
        return name;
    
  • 服务器发往客户端的消息实体
/**
 * 服务器发往客户端消息实体
 * @author zhenghuasheng
 */
public class Response 
    public void setResponseMessage(String responseMessage) 
        this.responseMessage = responseMessage;
    

    private String responseMessage;

    public Response(String responseMessage) 
        this.responseMessage = responseMessage;
    

    public String getResponseMessage() 
        return responseMessage;
    

1.4,控制类测试

/**
 * @author zhenghuasheng
 */
@Controller
public class WebSocketController 
    @Autowired
    SimpMessagingTemplate template;

    @Autowired
    WelcomeJob welcomeTask;

    private static final Logger logger = LoggerFactory.getLogger(WebSocketController.class);

    /**
     *  浏览器发送请求通过@messageMapping 映射/welcome 这个地址。
     *  服务器端有消息时,会订阅@SendTo 中的路径的浏览器发送消息。
     * @param message
     * @return
     * @throws Exception
     */
    @MessageMapping("/welcome") 
    @SendTo("/topic/getResponse") 
    public Response say(Message message) throws Exception 
        Thread.sleep(1000);
        return new Response("Welcome, " + message.getName() + "!");
    

    /**
     * 当有客户端订阅"/topic/getResponse",会收到消息
     * @return
     */
    @SubscribeMapping("/topic/getResponse")
    public Response sub() 
        logger.info("XXX用户订阅了我。。。");
        return new Response("感谢你订阅了我。。。");
    

    @GetMapping("/test")
    String test() 
        return "test";
    

    /**
     * 可以利用普通http request来主动推送广播消息
     * @return
     */
    @RequestMapping("/welcome")
    @ResponseBody
    public ResultVo say02() 
        try 
            template.convertAndSend("/topic/getResponse", new Response("欢迎!" ));
         catch (Exception e) 
            // TODO Auto-generated catch block
            e.printStackTrace();
        
        return ResultVo.ok();
    

使用点对点定向推送消

  • 更改端点注册,新增注册标示,这样服务器才能分辨推送的目的地
    /**
     * 注册stomp的端点
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) 
        registry.addEndpoint("/endpointWisely")
         //.addInterceptors(new WebSocketSessionHandshakeInterceptor())
                .setHandshakeHandler(new DefaultHandshakeHandler()
                    @Override
                    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) 
                        return new LocalPrincipal(ShiroUtils.getUser().getName());
                    
                )
                .setAllowedOrigins("*").withSockJS();
    

本例使用了shiro,使用用户名作为标示,,,可以设置为其他的唯一标示   只需修改 return new LocalPrincipal(ShiroUtils.getUser().getName());构造方法中的值。因为带有request和attributes,完全可以自定义标示数据,注册时如果想扩展参数或者自定义  只需新增握手拦截器HandshakeInterceptor

例如:通过前端传递的userId来标示,将获取的参数放入attributes中,注册时在该对象中获取
/**
 * Created by zhenghuasheng on 2016/6/20.
 */
public class WebSocketSessionHandshakeInterceptor extends HttpSessionHandshakeInterceptor 
    Logger logger = LoggerFactory.getLogger(WebSocketSessionHandshakeInterceptor.class);

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception 
        if (getSession(request) != null) 
            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
            HttpServletRequest httpServletRequest = servletRequest.getServletRequest();
            attributes.put("userId", httpServletRequest.getParameter("userId"));
        
        super.beforeHandshake(request, response, wsHandler, attributes);
        return true;
    

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) 
        super.afterHandshake(request, response, wsHandler, ex);
    
    private HttpSession getSession(ServerHttpRequest request) 
        if (request instanceof ServletServerHttpRequest) 
            ServletServerHttpRequest serverRequest = (ServletServerHttpRequest) request;
            return serverRequest.getServletRequest().getSession();
        
        return null;
    






/**
 * @author zhenghuasheng
 * @date 2018/3/27.14:31
 */
public class LocalPrincipal implements Principal 

    private String username;

    @Override
    public String getName() 
        return username;
    


    public LocalPrincipal(String username) 
        this.username = username;
    

    public LocalPrincipal() 
    

1.5,客户端

  • 引入js文件
<script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
<script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
弹框使用 Toastr,自己引入相关js文件
<script src="/js/plugins/toastr/toastr.min.js"></script>
  var stompClient = null;
    $(function () 
        connect();
    );

    function sendMessage() 
        stompClient.send("/welcome", , JSON.stringify('name': "123456"));
    
   function connect() 
        var socket = new SockJS('/endpointService'); //链接SockJS 的endpoint 名称为"/endpointService",对应config中的站点名称
        stompClient = Stomp.over(socket);//使用stomp子协议的WebSocket 客户端
        stompClient.connect(, function (frame) //链接Web Socket的服务端。
            console.log('Connected: ' + frame);
            stompClient.subscribe('/user/queue/msg', function (response) 
                var returnData = JSON.parse(response.body);
                console.log(returnData)
                toastr.options = 
                    "closeButton": true,
                    "debug": false,
                    "progressBar": true,
                    "positionClass": "toast-top-center",
                    "onclick": null,
                    "showDuration": "400",
                    "hideDuration": "1000",
                    "timeOut": "7000",
                    "extendedTimeOut": "1000",
                    "showEasing": "swing",
                    "hideEasing": "linear",
                    "showMethod": "fadeIn",
                    "hideMethod": "fadeOut"
                
                toastr.info(JSON.parse(response.body).responseMessage);
            );
            stompClient.subscribe('/topic/getResponse', function (response)  //订阅/topic/getResponse 目标发送的消息。这个是在控制器的@SendTo中定义的。
                toastr.options = 
                    "closeButton": true,
                    "debug": false,
                    "progressBar": true,
                    "positionClass": "toast-top-center",
                    "onclick": null,
                    "showDuration": "400",
                    "hideDuration": "1000",
                    "timeOut": "7000",
                    "extendedTimeOut": "1000",
                    "showEasing": "swing",
                    "hideEasing": "linear",
                    "showMethod": "fadeIn",
                    "hideMethod": "fadeOut"
                
                toastr.info(JSON.parse(response.body).responseMessage);
            );
        );
    

2,Spring添加基于STMOP协议的WebSocket支持

2.1 新增pom依赖,在原spring基础上新增

 <dependency>  
     <groupId>org.springframework</groupId>  
      <artifactId>spring-websocket</artifactId>  
      <version>$spring.version</version>  
 </dependency>  

 <dependency>  
    <groupId>org.springframework</groupId>  
    <artifactId>spring-messaging</artifactId>  
    <version>$spring.version</version>  
 </dependency>  

2.2 客户端、实体类与配置与SpringBoot的相关配置相同

spring可以使用xml形式配置站点服务

<websocket:message-broker application-destination-prefix="app" user-destination-prefix="user" >
    <websocket:stomp-endpoint allowed-origins="*" path="/webSocketServer">
        <websocket:handshake-interceptors>
            <ref bean="webSocketSessionHandshakeInterceptor"></ref>
        </websocket:handshake-interceptors>
        <websocket:sockjs/>
    </websocket:stomp-endpoint>

    <websocket:stomp-broker-relay prefix="/topic,/queue" heartbeat-receive-interval="2000" heartbeat-send-interval="2000"/>
</websocket:message-broker>

3,客户端向服务端传递参数

  • 1,发送示例,通过header传递
  function sendMessage() 
        stompClient.send("/welcome", name:zhs, JSON.stringify('name': "123456"));
    
  • 1,接受示例
    @MessageMapping("/welcome")
    @SendTo("/topic/getResponse")
    public Response say(Message message, @Header("name") String name) throws Exception 
        Thread.sleep(1000);
        return new Response("Welcome, " + message.getName() + "!");
    

2,发送示例,路径参数传递

  function sendMessage() 
        stompClient.send("/welcome/zhs", , JSON.stringify('name': "123456"));
    

2,接受示例

    @MessageMapping("/welcome/name")
    @SendTo("/topic/getResponse")
    public Response say(Message message, @DestinationVariable("name") String name) throws Exception 
        Thread.sleep(1000);
        return new Response("Welcome, " + message.getName() + "!");
    

以上是关于SpringBoot/Spring中建立WebSocket连接(STOMP)的主要内容,如果未能解决你的问题,请参考以下文章

Springboot:Spring Boot 之 HelloWorld

Firefox 无法建立到 WSS 的连接

neo4 WebSocket 连接到“ws://localhost:7687/”失败:通过代理服务器建立隧道失败

Spring Boot 入门

SpringBoot:Spring容器的启动过程

springboot整合websocket