Spring Boot整合WebSocket实现实时消息推送
Posted dkbnull
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Boot整合WebSocket实现实时消息推送相关的知识,希望对你有一定的参考价值。
0. 开发环境
- JDK:1.8
- Spring Boot:2.1.1.RELEASE
1. 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2. 新建WebSocket配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig
/**
* ServerEndpointExporter注入
* 该Bean会自动注册使用@ServerEndpoint注解申明的WebSocket endpoint
*
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter()
return new ServerEndpointExporter();
3. 新建WebSocket服务类
package cn.wbnull.springbootdemo.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
@ServerEndpoint("/websocket/terminalId")
public class WebSocketService
private final Logger logger = LoggerFactory.getLogger(WebSocketService.class);
/**
* 保存连接信息
*/
private static final Map<String, Session> CLIENTS = new ConcurrentHashMap<>();
@OnOpen
public void onOpen(@PathParam("terminalId") String terminalId, Session session) throws Exception
logger.info(session.getRequestURI().getPath() + ",打开连接开始:" + session.getId());
//当前连接已存在,关闭
if (CLIENTS.containsKey(terminalId))
onClose(CLIENTS.get(terminalId));
CLIENTS.put(terminalId, session);
logger.info(session.getRequestURI().getPath() + ",打开连接完成:" + session.getId());
@OnClose
public void onClose(@PathParam("terminalId") String terminalId, Session session) throws Exception
logger.info(session.getRequestURI().getPath() + ",关闭连接开始:" + session.getId());
CLIENTS.remove(terminalId);
logger.info(session.getRequestURI().getPath() + ",关闭连接完成:" + session.getId());
@OnMessage
public void onMessage(String message, Session session)
logger.info("前台发送消息:" + message);
@OnError
public void onError(Session session, Throwable error)
logger.error(error.toString());
public void onClose(Session session)
//判断当前连接是否在线
// if (!session.isOpen())
// return;
//
try
session.close();
catch (IOException e)
logger.error("金斗云关闭连接异常:" + e);
public void sendMessage(String message, Session session)
try
session.getAsyncRemote().sendText(message);
logger.info("推送成功:" + message);
catch (Exception e)
logger.error("推送异常:" + e);
public boolean sendMessage(String terminalId, String message)
try
Session session = CLIENTS.get(terminalId);
session.getAsyncRemote().sendText(message);
logger.info("推送成功:" + message);
return true;
catch (Exception e)
logger.error("推送异常:" + e);
return false;
4. 新建Web端连接页面
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket Test</title>
</head>
<body>
<label for="text"></label><input id="text" type="text"/>
<button onclick="sendMessage()">发送消息</button>
<button onclick="closeWebSocket()">关闭连接</button>
<div id="message"></div>
</body>
<script type="text/javascript">
let websocket = null;
//判断当前浏览器是否支持WebSocket
if ('WebSocket' in window)
websocket = new WebSocket("ws://localhost:8090/websocket/123456");
else
alert('Not support websocket');
//连接发生错误的回调方法
websocket.onerror = function ()
showMessage("打开连接失败");
;
//连接成功建立的回调方法
websocket.onopen = function (event)
showMessage("打开连接成功");
//接收到消息的回调方法
websocket.onmessage = function (event)
showMessage(event.data);
//连接关闭的回调方法
websocket.onclose = function ()
showMessage("关闭连接成功");
//监听窗口关闭事件,当窗口关闭时,主动关闭WebSocket连接
window.onbeforeunload = function ()
websocket.close();
function showMessage(message)
document.getElementById('message').innerHTML += message + '<br/>';
function closeWebSocket()
websocket.close();
//发送消息
function sendMessage()
const message = document.getElementById('text').value;
websocket.send(message);
</script>
</html>
5. 设置后端消息推送
为便于测试,直接修改WebSocketService,当收到前台消息时,直接给前台发送消息
@OnMessage
public void onMessage(String message, Session session)
logger.info("前台发送消息:" + message);
sendMessage("收到消息:" + message, session);
6. 测试
浏览器访问http://127.0.0.1:8090/,页面显示如下:
后台日志如下:
2022-07-02 21:03:22.753 INFO 9168 --- [nio-8090-exec-2] c.w.s.service.WebSocketService : /websocket/123456,打开连接开始:0
2022-07-02 21:03:22.753 INFO 9168 --- [nio-8090-exec-2] c.w.s.service.WebSocketService : /websocket/123456,打开连接完成:0
当我们浏览器打开第二个页面,第一个页面会显示断开连接
后台日志如下:
2022-07-02 21:04:01.936 INFO 9168 --- [nio-8090-exec-4] c.w.s.service.WebSocketService : /websocket/123456,打开连接开始:1
2022-07-02 21:04:01.938 INFO 9168 --- [nio-8090-exec-4] c.w.s.service.WebSocketService : /websocket/123456,关闭连接开始:0
2022-07-02 21:04:01.938 INFO 9168 --- [nio-8090-exec-4] c.w.s.service.WebSocketService : /websocket/123456,关闭连接完成:0
2022-07-02 21:04:01.938 INFO 9168 --- [nio-8090-exec-4] c.w.s.service.WebSocketService : /websocket/123456,打开连接完成:1
前台页面录入消息并发送,后台可正常收到消息
2022-07-02 21:08:21.860 INFO 1772 --- [nio-8090-exec-3] c.w.s.service.WebSocketService : 前台发送消息:测试消息
2022-07-02 21:08:21.863 INFO 1772 --- [nio-8090-exec-3] c.w.s.service.WebSocketService : 推送成功:收到消息:测试消息
前台页面显示正常
点击关闭连接,可以正常关闭
7. 服务保活
对于实时消息推送相关服务,都存在保活问题。这里我们采用心跳保活。 修改WebSocketService类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Component
@ServerEndpoint("/websocket/terminalId")
public class WebSocketService
private final Logger logger = LoggerFactory.getLogger(WebSocketService.class);
/**
* 保存连接信息
*/
private static final Map<String, Session> CLIENTS = new ConcurrentHashMap<>();
private static final Map<String, AtomicInteger> TERMINAL_IDS = new HashMap<>();
@OnOpen
public void onOpen(@PathParam("terminalId") String terminalId, Session session) throws Exception
logger.info(session.getRequestURI().getPath() + ",打开连接开始:" + session.getId());
//当前连接已存在,关闭
if (CLIENTS.containsKey(terminalId))
onClose(CLIENTS.get(terminalId));
TERMINAL_IDS.put(terminalId, new AtomicInteger(0));
CLIENTS.put(terminalId, session);
logger.info(session.getRequestURI().getPath() + ",打开连接完成:" + session.getId());
@OnClose
public void onClose(@PathParam("terminalId") String terminalId, Session session) throws Exception
logger.info(session.getRequestURI().getPath() + ",关闭连接开始:" + session.getId());
CLIENTS.remove(terminalId);
TERMINAL_IDS.remove(terminalId);
logger.info(session.getRequestURI().getPath() + ",关闭连接完成:" + session.getId());
@OnMessage
public void onMessage(String message, Session session)
logger.info("前台发送消息:" + message);
if ("心跳".equals(message))
//重置当前终端心跳次数
TERMINAL_IDS.get(message).set(0);
return;
sendMessage("收到消息:" + message, session);
@OnError
public void onError(Session session, Throwable error)
logger.error(error.toString());
public void onClose(Session session)
//判断当前连接是否在线
// if (!session.isOpen())
// return;
//
try
session.close();
catch (IOException e)
logger.error("金斗云关闭连接异常:" + e);
public void heartbeat()
//检查所有终端心跳次数
for (String key : TERMINAL_IDS.keySet())
//心跳3次及以上的主动断开
if ((TERMINAL_IDS.get(key).intValue() >= 3))
logger.info("心跳超时,关闭连接:" + key);
onClose(CLIENTS.get(key));
for (String key : CLIENTS.keySet())
//记录当前终端心跳次数
TERMINAL_IDS.get(key).incrementAndGet();
sendMessage("心跳", CLIENTS.get(key));
public void sendMessage(String message, Session session)
try
session.getAsyncRemote().sendText(message);
logger.info("推送成功:" + message);
catch (Exception e)
logger.error("推送异常:" + e);
public boolean sendMessage(String terminalId, String message)
try
Session session = CLIENTS.get(terminalId);
session.getAsyncRemote().sendText(message);
logger.info("推送成功:" + message);
return true;
catch (Exception e)
logger.error("推送异常:" + e);
return false;
8. 新增定时任务
新增定时任务,定时给连接到WebSocket的终端发送消息,超过指定次数未回应的终端视为离线,主动关闭连接。
import cn.wbnull.springbootdemo.serviceSpring Boot整合WebSocket实现实时消息推送
Spring Boot整合WebSocket实现实时消息推送
Spring Boot2 系列教程 (十七) | 整合 WebSocket 实现聊天室
SpringBoot 整合WebSocket 实现简单聊天室