spring boot下WebSocket消息推送
Posted 一杯苦茶ol
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring boot下WebSocket消息推送相关的知识,希望对你有一定的参考价值。
WebSocket协议
WebSocket是一种在单个TCP连接上进行全双工通讯的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输
STOMP协议
STOMP是面向文本的消息传送协议。STOMP客户端与支持STOMP协议的消息代理进行通信。STOMP使用不同的命令,如连接,发送,订阅,断开等进行通信。
具体参考:官方介绍
SockJS
SockJS是一个javascript库,提供跨浏览器JavaScript的API,创建了一个低延迟、全双工的浏览器和web服务器之间通信通道
以上内容出自维基百科和百度百科
使用websocket有两种方式:1是使用sockjs,2是使用h5的标准。使用html5标准自然更方便简单,所以记录的是配合h5的使用方法。
1、pom引入
核心是@ServerEndpoint这个注解。这个注解是Javaee标准里的注解,tomcat7以上已经对其进行了实现,如果是用传统方法使用tomcat发布项目,只要在pom文件中引入javaee标准即可使用。
1 <dependency> 2 <groupId>javax</groupId> 3 <artifactId>javaee-api</artifactId> 4 <version>7.0</version> 5 <scope>provided</scope> 6 </dependency>
但使用springboot的内置tomcat时,就不需要引入javaee-api了,spring-boot已经包含了。使用springboot的websocket功能首先引入springboot组件。
1 <dependency> 2 <groupId>org.springframework.boot</groupId> 3 <artifactId>spring-boot-starter-websocket</artifactId> 4 </dependency>
2、使用@ServerEndpoint创立websocket endpoint
首先要注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint。要注意,如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。
1 @Configuration 2 public class WebSocketConfig { 3 @Bean 4 public ServerEndpointExporter serverEndpointExporter() { 5 return new ServerEndpointExporter(); 6 } 7 8 }
下面事websocket的具体实现方法,代码如下:
1 import org.springframework.stereotype.Component; 2 3 import javax.websocket.*; 4 import javax.websocket.server.ServerEndpoint; 5 import java.io.IOException; 6 import java.util.concurrent.CopyOnWriteArraySet; 7 8 @ServerEndpoint(value = "/websocket") 9 @Component //此注解千万千万不要忘记,它的主要作用就是将这个监听器纳入到Spring容器中进行管理 10 public class WebSocket { 11 //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 12 private static int onlineCount = 0; 13 14 //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 15 private static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<WebSocket>(); 16 17 //与某个客户端的连接会话,需要通过它来给客户端发送数据 18 private Session session; 19 20 /** 21 * 连接建立成功调用的方法 22 */ 23 @OnOpen 24 public void onOpen(Session session) { 25 this.session = session; 26 webSocketSet.add(this); //加入set中 27 addOnlineCount(); //在线数加1 28 System.out.println("有新连接加入!当前在线人数为" + getOnlineCount()); 29 try { 30 sendMessage("Hello world"); 31 } catch (IOException e) { 32 System.out.println("IO异常"); 33 } 34 } 35 36 /** 37 * 连接关闭调用的方法 38 */ 39 @OnClose 40 public void onClose() { 41 webSocketSet.remove(this); //从set中删除 42 subOnlineCount(); //在线数减1 43 System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount()); 44 } 45 46 /** 47 * 收到客户端消息后调用的方法 48 * 49 * @param message 客户端发送过来的消息 50 */ 51 @OnMessage 52 public void onMessage(String message, Session session) { 53 System.out.println("来自客户端的消息:" + message); 54 55 //群发消息 56 for (WebSocket item : webSocketSet) { 57 try { 58 item.sendMessage(message); 59 } catch (IOException e) { 60 e.printStackTrace(); 61 } 62 } 63 } 64 65 /** 66 * 发生错误时调用 67 */ 68 @OnError 69 public void onError(Session session, Throwable error) { 70 System.out.println("发生错误"); 71 error.printStackTrace(); 72 } 73 74 75 public void sendMessage(String message) throws IOException { 76 this.session.getBasicRemote().sendText(message); 77 //this.session.getAsyncRemote().sendText(message); 78 } 79 80 81 /** 82 * 群发自定义消息 83 */ 84 public static void sendInfo(String message) throws IOException { 85 for (WebSocket item : webSocketSet) { 86 try { 87 item.sendMessage(message); 88 } catch (IOException e) { 89 continue; 90 } 91 } 92 } 93 94 public static synchronized int getOnlineCount() { 95 return onlineCount; 96 } 97 98 public static synchronized void addOnlineCount() { 99 WebSocket.onlineCount++; 100 } 101 102 public static synchronized void subOnlineCount() { 103 WebSocket.onlineCount--; 104 } 105 }
使用springboot的唯一区别是要@Component声明下,而使用独立容器是由容器自己管理websocket的,但在springboot中连容器都是spring管理的。
虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
3、前端代码
1 <!DOCTYPE HTML> 2 <html> 3 <head> 4 <title>My WebSocket</title> 5 </head> 6 7 <body> 8 Welcome<br/> 9 <input id="text" type="text" /><button onclick="send()">Send</button> <button onclick="closeWebSocket()">Close</button> 10 <div id="message"> 11 </div> 12 </body> 13 14 <script type="text/javascript"> 15 var websocket = null; 16 17 //判断当前浏览器是否支持WebSocket 18 if(‘WebSocket‘ in window){ 19 websocket = new WebSocket("ws://localhost:8084/websocket"); 20 } 21 else{ 22 alert(‘Not support websocket‘) 23 } 24 25 //连接发生错误的回调方法 26 websocket.onerror = function(){ 27 setMessageInnerHTML("error"); 28 }; 29 30 //连接成功建立的回调方法 31 websocket.onopen = function(event){ 32 setMessageInnerHTML("open"); 33 } 34 35 //接收到消息的回调方法 36 websocket.onmessage = function(event){ 37 setMessageInnerHTML(event.data); 38 } 39 40 //连接关闭的回调方法 41 websocket.onclose = function(){ 42 setMessageInnerHTML("close"); 43 } 44 45 //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。 46 window.onbeforeunload = function(){ 47 websocket.close(); 48 } 49 50 //将消息显示在网页上 51 function setMessageInnerHTML(innerHTML){ 52 document.getElementById(‘message‘).innerHTML += innerHTML + ‘<br/>‘; 53 } 54 55 //关闭连接 56 function closeWebSocket(){ 57 websocket.close(); 58 } 59 60 //发送消息 61 function send(){ 62 var message = document.getElementById(‘text‘).value; 63 websocket.send(message); 64 } 65 </script> 66 </html>
以上代码,实现了websocket简单消息推送,可以实现两个页面间的消息显示,但是Java后台主动推送消息时,无法获取消息推送的websocket下的session,即无法实现websocket下session的共享。
为解决主动推送的难题,需要在建立连接时,将websocket下的session与servlet下的HttpSession(或者其他session,我们这用到了shiro下的session)建立关联关系。
webSocket配置Java类:
1 import com.bootdo.common.utils.ShiroUtils; 2 import org.apache.catalina.session.StandardSessionFacade; 3 import org.apache.shiro.session.Session; 4 import org.springframework.context.annotation.Bean; 5 import org.springframework.context.annotation.Configuration; 6 import org.springframework.web.socket.server.standard.ServerEndpointExporter; 7 8 import javax.servlet.http.HttpSession; 9 import javax.websocket.HandshakeResponse; 10 import javax.websocket.server.HandshakeRequest; 11 import javax.websocket.server.ServerEndpointConfig; 12 import javax.websocket.server.ServerEndpointConfig.Configurator; 13 14 @Configuration 15 public class WebSocketConfig extends Configurator { 16 17 @Override 18 public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { 19 /*如果没有监听器,那么这里获取到的HttpSession是null*/ 20 StandardSessionFacade ssf = (StandardSessionFacade) request.getHttpSession(); 21 if (ssf != null) { 22 HttpSession httpSession = (HttpSession) request.getHttpSession(); 23 //关键操作 24 sec.getUserProperties().put("sessionId", httpSession.getId()); 25 System.out.println("获取到的SessionID:" + httpSession.getId()); 26 } 27 } 28 29 /** 30 * 引入shiro框架下的session,获取session信息 31 */ 32 /* 33 @Override 34 public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { 35 Session shiroSession = ShiroUtils.getSubjct().getSession(); 36 sec.getUserProperties().put("sessionId", shiroSession.getId()); 37 } 38 */ 39 40 @Bean 41 public ServerEndpointExporter serverEndpointExporter() { 42 //这个对象说一下,貌似只有服务器是tomcat的时候才需要配置,具体我没有研究 43 return new ServerEndpointExporter(); 44 } 45 }
webSocket消息实现类方法:
1 import org.springframework.stereotype.Component; 2 3 import javax.websocket.*; 4 import javax.websocket.server.ServerEndpoint; 5 import java.io.IOException; 6 import java.util.concurrent.CopyOnWriteArraySet; 7 8 //configurator = WebsocketConfig.class 该属性就是我上面配置的信息 9 @ServerEndpoint(value = "/websocket", configurator = WebSocketConfig.class) 10 @Component //此注解千万千万不要忘记,它的主要作用就是将这个监听器纳入到Spring容器中进行管理 11 public class WebSocket { 12 //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 13 private static int onlineCount = 0; 14 15 //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 16 private static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<WebSocket>(); 17 18 //与某个客户端的连接会话,需要通过它来给客户端发送数据 19 private Session session; 20 21 /** 22 * 连接建立成功调用的方法 23 * <p> 24 * config用来获取WebsocketConfig中的配置信息 25 */ 26 @OnOpen 27 public void onOpen(Session session, EndpointConfig config) { 28 29 //获取WebsocketConfig.java中配置的“sessionId”信息值 30 String httpSessionId = (String) config.getUserProperties().get("sessionId"); 31 32 this.session = session; 33 webSocketSet.add(this); //加入set中 34 addOnlineCount(); //在线数加1 35 System.out.println("有新连接加入!当前在线人数为" + getOnlineCount()); 36 try { 37 sendMessage("Hello world"); 38 } catch (IOException e) { 39 System.out.println("IO异常"); 40 } 41 } 42 43 /** 44 * 连接关闭调用的方法 45 */ 46 @OnClose 47 public void onClose() { 48 webSocketSet.remove(this); //从set中删除 49 subOnlineCount(); //在线数减1 50 System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount()); 51 } 52 53 /** 54 * 收到客户端消息后调用的方法 55 * 56 * @param message 客户端发送过来的消息 57 */ 58 @OnMessage 59 public void onMessage(String message, Session session) { 60 System.out.println("来自客户端的消息:" + message); 61 62 //群发消息 63 for (WebSocket item : webSocketSet) { 64 try { 65 item.sendMessage(message); 66 } catch (IOException e) { 67 e.printStackTrace(); 68 } 69 } 70 } 71 72 /** 73 * 发生错误时调用 74 */ 75 @OnError 76 public void onError(Session session, Throwable error) { 77 System.out.println("发生错误"); 78 error.printStackTrace(); 79 } 80 81 82 public void sendMessage(String message) throws IOException { 83 this.session.getBasicRemote().sendText(message); 84 //this.session.getAsyncRemote().sendText(message); 85 } 86 87 88 /** 89 * 群发自定义消息 90 */ 91 public static void sendInfo(String message) throws IOException { 92 for (WebSocket item : webSocketSet) { 93 try { 94 item.sendMessage(message); 95 } catch (IOException e) { 96 continue; 97 } 98 } 99 } 100 101 public static synchronized int getOnlineCount() { 102 return onlineCount; 103 } 104 105 public static synchronized void addOnlineCount() { 106 WebSocket.onlineCount++; 107 } 108 109 public static synchronized void subOnlineCount() { 110 WebSocket.onlineCount--; 111 } 112 }
注意,有上面配置后,如果配置获取的信息为null,需加入监听实现类:
1 import org.springframework.stereotype.Component; 2 3 import javax.servlet.ServletRequestEvent; 4 import javax.servlet.ServletRequestListener; 5 import javax.servlet.http.HttpServletRequest; 6 import javax.servlet.http.HttpSession; 7 8 /** 9 * 监听器类:主要任务是用ServletRequest将我们的HttpSession携带过去 10 */ 11 @Component //此注解千万千万不要忘记,它的主要作用就是将这个监听器纳入到Spring容器中进行管理,相当于注册监听吧 12 public class RequestListener implements ServletRequestListener { 13 @Override 14 public void requestInitialized(ServletRequestEvent sre) { 15 //将所有request请求都携带上httpSession 16 HttpSession httpSession= ((HttpServletRequest) sre.getServletRequest()).getSession(); 17 System.out.println("将所有request请求都携带上httpSession " + httpSession.getId()); 18 } 19 20 public RequestListener() { 21 } 22 23 @Override 24 public void requestDestroyed(ServletRequestEvent arg0) { 25 } 26 }
以上是关于spring boot下WebSocket消息推送的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot + WebSocket 实时消息推送
Spring Boot整合WebSocket实现实时消息推送
Spring Boot整合WebSocket实现实时消息推送