SpringBoot整合Websocket实现即时聊天功能
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合Websocket实现即时聊天功能相关的知识,希望对你有一定的参考价值。
参考技术A 近期,公司需要新增即时聊天的业务,于是用websocket 整合到Springboot完成业务的实现。一、我们来简单的介绍下websocket的交互原理:
1.客户端先服务端发起websocket请求;
2.服务端接收到请求之后,把请求响应返回给客户端;
3.客户端与服务端只需要一次握手即可完成交互通道;
二、webscoket支持的协议:基于TCP协议下,http协议和https协议;
http协议 springboot不需要做任何的配置
https协议则需要配置nignx代理,注意证书有效的问题 ---在这不做详细说明
三、开始我们的实现java后端的实现
1.添加依赖
<!-- websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>$spring.version</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>$spring.version</version>
</dependency>
<!-- WebSocket -->
2.配置config
@ConditionalOnWebApplication
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractSessionWebSocketMessageBrokerConfigurer
@Bean
public ServerEndpointExporter serverEndpointExporter()
return new ServerEndpointExporter();
@Bean
public CustomSpringConfigurator customSpringConfigurator()
return new CustomSpringConfigurator();
@Override
protected void configureStompEndpoints(StompEndpointRegistry registry)
registry.addEndpoint("/websocket").setAllowedOrigins("*")
.addInterceptors(new HttpSessionHandshakeInterceptor()).withSockJS();
public class CustomSpringConfigurator extends ServerEndpointConfig.Configurator implements ApplicationContextAware
private static volatile BeanFactory context;
@Override
public <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException
return context.getBean(clazz);
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
CustomSpringConfigurator.context = applicationContext;
@Override
public void modifyHandshake(ServerEndpointConfig sec,
HandshakeRequest request, HandshakeResponse response)
super.modifyHandshake(sec,request,response);
HttpSession httpSession=(HttpSession) request.getHttpSession();
if(httpSession!=null)
sec.getUserProperties().put(HttpSession.class.getName(),httpSession);
@SpringBootApplication
@EnableCaching
@ComponentScan("com")
@EnableWebSocket
public class Application extends SpringBootServletInitializer
static final Logger logger = LoggerFactory.getLogger(Application.class);
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application)
return application.sources(Application.class);
需要注意的是: @EnableWebSocket 一定要加在启动类上,不然springboot无法对其扫描进行管理;
@SeverEndpoint --将目标类定义成一个websocket服务端,注解对应的值将用于监听用户连接的终端访问地址,客户端可以通过URL来连接到websocket服务端。
四、设计思路:用map<房间id, 用户set>来保存房间对应的用户连接列表,当有用户进入一个房间的时候,就会先检测房间是否存在,如果不存在那就新建一个空的用户set,再加入本身到这个set中,确保不同房间号里的用户session不串通!
/**
* Create by wushuyu
* on 2020/4/30 13:24
*
*/
@ServerEndpoint(value = "/websocket/roomName", configurator = CustomSpringConfigurator.class)
@Component
public class WebSocketRoom
//连接超时--一天
private static final long MAX_TIME_OUT = 24*60*60*1000;
// key为房间号,value为该房间号的用户session
private static final Map<String, Set<Session>> rooms = new ConcurrentHashMap<>();
//将用户的信息存储在一个map集合里
private static final Map<String, Object> users = new ConcurrentHashMap<>();
/**
*roomName 使用通用跳转,实现动态获取房间号和用户信息 格式:roomId|xx|xx
*/
@OnOpen
public void connect(@PathParam("roomName") String roomName, Session session)
String roomId = roomName.split("[|]")[0];
String nickname = roomName.split("[|]")[1];
String loginId = roomName.split("[|]")[2];
//设置连接超时时间
session.setMaxIdleTimeout(MAX_TIME_OUT);
try
//可实现业务逻辑
// 将session按照房间名来存储,将各个房间的用户隔离
if (!rooms.containsKey(roomId))
// 创建房间不存在时,创建房间
Set<Session> room = new HashSet<>();
// 添加用户
room.add(session);
rooms.put(roomId, room);
else // 房间已存在,直接添加用户到相应的房间
if (rooms.values().contains(session)) //如果房间里有此session直接不做操作
else //不存在则添加
rooms.get(roomId).add(session);
JSONObject jsonObject = new JSONObject();
-----
//根据自身业务情况实现业务
-----
users.put(session.getId(), jsonObject);
//向在线的人发送当前在线的人的列表 -------------可有可无,看业务需求
List<ChatMessage> userList = new LinkedList<>();
rooms.get(roomId)
.stream()
.map(Session::getId)
.forEach(s ->
ChatMessage chatMessage = new ChatMessage();
chatMessage.setDate(new Date());
chatMessage.setStatus(1);
chatMessage.setChatContent(users.get(s));
chatMessage.setMessage("");
userList.add(chatMessage);
);
// session.getBasicRemote().sendText(JSON.toJSONString(userList));
//向房间的所有人群发谁上线了
ChatMessage chatMessage = new ChatMessage(); ----将聊天信息封装起来。
chatMessage.setDate(new Date());
chatMessage.setStatus(1);
chatMessage.setChatContent(users.get(session.getId()));
chatMessage.setMessage("");
broadcast(roomId, JSON.toJSONString(chatMessage));
broadcast(roomId, JSON.toJSONString(userList));
catch (Exception e)
e.printStackTrace();
@OnClose
public void disConnect(@PathParam("roomName") String roomName, Session session)
String roomId = roomName.split("[|]")[0];
String loginId = roomName.split("[|]")[2];
try
rooms.get(roomId).remove(session);
ChatMessage chatMessage = new ChatMessage();
chatMessage.setDate(new Date());
chatMessage.setUserName(user.getRealname());
chatMessage.setStatus(0);
chatMessage.setChatContent(users.get(session.getId()));
chatMessage.setMessage("");
users.remove(session.getId());
//向在线的人发送当前在线的人的列表 ----可有可无,根据业务要求
List<ChatMessage> userList = new LinkedList<>();
rooms.get(roomId)
.stream()
.map(Session::getId)
.forEach(s ->
ChatMessage chatMessage1 = new ChatMessage();
chatMessage1.setDate(new Date());
chatMessage1.setUserName(user.getRealname());
chatMessage1.setStatus(1);
chatMessage1.setChatContent(users.get(s));
chatMessage1.setMessage("");
userList.add(chatMessage1);
);
broadcast(roomId, JSON.toJSONString(chatMessage));
broadcast(roomId, JSON.toJSONString(userList));
catch (Exception e)
e.printStackTrace();
@OnMessage
public void receiveMsg( String msg, Session session)
try
ChatMessage chatMessage = new ChatMessage();
chatMessage.setUserName(user.getRealname());
chatMessage.setStatus(2);
chatMessage.setChatContent(users.get(session.getId()));
chatMessage.setMessage(msg);
// 按房间群发消息
broadcast(roomId, JSON.toJSONString(chatMessage));
catch (IOException e)
e.printStackTrace();
// 按照房间名进行群发消息
private void broadcast(String roomId, String msg)
rooms.get(roomId).forEach(s ->
try
s.getBasicRemote().sendText(msg); -----此还有一个getAsyncRemote()
catch (IOException e)
e.printStackTrace();
);
@OnError
public void onError(Throwable error)
error.printStackTrace();
友情提示:此session是websocket里的session,并非httpsession;
springboot整合websocket
一、背景
??我们都知道http协议只能浏览器单方面向服务器发起请求获得响应,服务器不能主动向浏览器推送消息。想要实现浏览器的主动推送有两种主流实现方式:
- 轮询:缺点很多,但是实现简单
- websocket:在浏览器和服务器之间建立tcp连接,实现全双工通信
??springboot使用websocket有两种方式,一种是实现简单的websocket,另外一种是实现STOMP协议。这一篇实现简单的websocket,STOMP下一篇在讲。
注意:如下都是针对使用springboot内置容器
二、实现
1、依赖引入
??要使用websocket关键是@ServerEndpoint
这个注解,该注解是javaee标准中的注解,tomcat7及以上已经实现了,如果使用传统方法将war包部署到tomcat中,只需要引入如下javaee标准依赖即可:
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>7.0</version>
<scope>provided</scope>
</dependency>
如使用springboot内置容器,无需引入,springboot已经做了包含。我们只需引入如下依赖即可:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>1.5.3.RELEASE</version>
<type>pom</type>
</dependency>
2、注入Bean
??首先注入一个ServerEndpointExporterBean,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint。代码如下:
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
3、申明endpoint
??建立MyWebSocket.java类,在该类中处理websocket逻辑
@ServerEndpoint(value = "/websocket") //接受websocket请求路径
@Component //注册到spring容器中
public class MyWebSocket {
//保存所有在线socket连接
private static Map<String,MyWebSocket> webSocketMap = new LinkedHashMap<>();
//记录当前在线数目
private static int count=0;
//当前连接(每个websocket连入都会创建一个MyWebSocket实例
private Session session;
private Logger log = LoggerFactory.getLogger(this.getClass());
//处理连接建立
@OnOpen
public void onOpen(Session session){
this.session=session;
webSocketMap.put(session.getId(),this);
addCount();
log.info("新的连接加入:{}",session.getId());
}
//接受消息
@OnMessage
public void onMessage(String message,Session session){
log.info("收到客户端{}消息:{}",session.getId(),message);
try{
this.sendMessage("收到消息:"+message);
}catch (Exception e){
e.printStackTrace();
}
}
//处理错误
@OnError
public void onError(Throwable error,Session session){
log.info("发生错误{},{}",session.getId(),error.getMessage());
}
//处理连接关闭
@OnClose
public void onClose(){
webSocketMap.remove(this.session.getId());
reduceCount();
log.info("连接关闭:{}",this.session.getId());
}
//群发消息
//发送消息
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
//广播消息
public static void broadcast(){
MyWebSocket.webSocketMap.forEach((k,v)->{
try{
v.sendMessage("这是一条测试广播");
}catch (Exception e){
}
});
}
//获取在线连接数目
public static int getCount(){
return count;
}
//操作count,使用synchronized确保线程安全
public static synchronized void addCount(){
MyWebSocket.count++;
}
public static synchronized void reduceCount(){
MyWebSocket.count--;
}
}
4、客户的实现
??客户端使用h5原生websocket,部分浏览器可能不支持。代码如下:
<html>
<head>
<title>websocket测试</title>
<meta charset="utf-8">
</head>
<body>
<button onclick="sendMessage()">测试</button>
<script>
let socket = new WebSocket("ws://localhost:8080/websocket");
socket.onerror = err => {
console.log(err);
};
socket.onopen = event => {
console.log(event);
};
socket.onmessage = mess => {
console.log(mess);
};
socket.onclose = () => {
console.log("连接关闭");
};
function sendMessage() {
if (socket.readyState === 1)
socket.send("这是一个测试数据");
else
alert("尚未建立websocket连接");
}
</script>
</body>
</html>
三、测试
??建立一个controller测试群发,代码如下:
@RestController
public class HomeController {
@GetMapping("/broadcast")
public void broadcast(){
MyWebSocket.broadcast();
}
}
然后打开上面的html,可以看到浏览器和服务器都输出连接成功的信息:
浏览器:
Event {isTrusted: true, type: "open", target: WebSocket, currentTarget: WebSocket, eventPhase: 2, …}
服务端:
2018-08-01 14:05:34.727 INFO 12708 --- [nio-8080-exec-1] com.fxb.h5websocket.MyWebSocket : 新的连接加入:0
点击测试按钮,可在服务端看到如下输出:
2018-08-01 15:00:34.644 INFO 12708 --- [nio-8080-exec-6] com.fxb.h5websocket.MyWebSocket : 收到客户端2消息:这是一个测试数据
再次打开html页面,这样就有两个websocket客户端,然后在浏览器访问localhost:8080/broadcast测试群发功能,每个客户端都会输出如下信息:
MessageEvent {isTrusted: true, data: "这是一条测试广播", origin: "ws://localhost:8080", lastEventId: "", source: null, …}
??源码可在github上下载,记得点赞,star哦
以上是关于SpringBoot整合Websocket实现即时聊天功能的主要内容,如果未能解决你的问题,请参考以下文章
springboot整合websocket实现一对一消息推送和广播消息推送